You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/12/05 17:55:11 UTC

[arrow-datafusion] branch master updated: Minor: Update docstrings and comments to aggregate code (#4489)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 394c5eebf Minor: Update docstrings and comments to aggregate code (#4489)
394c5eebf is described below

commit 394c5eebf0cfb080698485bd32a8d2fb2230fb80
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Mon Dec 5 12:55:05 2022 -0500

    Minor: Update docstrings and comments to aggregate code (#4489)
    
    * Minor: add some comments to aggregate code
    
    * Fix typos
    
    Co-authored-by: jakevin <ja...@gmail.com>
    
    * fix typo
    
    Co-authored-by: jakevin <ja...@gmail.com>
    
    Co-authored-by: jakevin <ja...@gmail.com>
---
 .../core/src/physical_plan/aggregates/hash.rs      |  8 ++++-
 datafusion/expr/src/accumulator.rs                 | 36 +++++++++++++++++-----
 2 files changed, 35 insertions(+), 9 deletions(-)

diff --git a/datafusion/core/src/physical_plan/aggregates/hash.rs b/datafusion/core/src/physical_plan/aggregates/hash.rs
index 8bf929630..4ab2a0a06 100644
--- a/datafusion/core/src/physical_plan/aggregates/hash.rs
+++ b/datafusion/core/src/physical_plan/aggregates/hash.rs
@@ -468,6 +468,11 @@ impl std::fmt::Debug for Accumulators {
 }
 
 /// Create a RecordBatch with all group keys and accumulator' states or values.
+///
+/// The output looks like
+/// ```text
+/// gby_expr1, gby_expr2, ... agg_1, agg2, ...
+/// ```
 fn create_batch_from_map(
     mode: &AggregateMode,
     accumulators: &Accumulators,
@@ -493,6 +498,7 @@ fn create_batch_from_map(
         }
     }
 
+    // First, output all group by exprs
     let mut columns = (0..num_group_expr)
         .map(|i| {
             ScalarValue::iter_to_array(
@@ -504,7 +510,7 @@ fn create_batch_from_map(
         })
         .collect::<Result<Vec<_>>>()?;
 
-    // add state / evaluated arrays
+    // next, output aggregates: either intermediate state or final output
     for (x, &state_len) in acc_data_types.iter().enumerate() {
         for y in 0..state_len {
             match mode {
diff --git a/datafusion/expr/src/accumulator.rs b/datafusion/expr/src/accumulator.rs
index 3d107c0c4..5b8269ee2 100644
--- a/datafusion/expr/src/accumulator.rs
+++ b/datafusion/expr/src/accumulator.rs
@@ -31,17 +31,30 @@ use std::fmt::Debug;
 /// * update its state from multiple accumulators' states via `merge_batch`
 /// * compute the final value from its internal state via `evaluate`
 pub trait Accumulator: Send + Sync + Debug {
-    /// Returns the state of the accumulator at the end of the accumulation.
-    /// in the case of an average on which we track `sum` and `n`, this function should return a vector
-    /// of two values, sum and n.
+    /// Returns the partial intermediate state of the accumulator. This
+    /// partial state is serialied as `Arrays` and then combined with
+    /// other partial states from different instances of this
+    /// accumulator (that ran on different partitions, for
+    /// example).
+    ///
+    /// The state can be a different type than the output of the
+    /// [`Accumulator`]
+    ///
+    /// See [`merge_batch`] for more details on the merging process.
+    ///
+    /// For example, in the case of an average, for which we track `sum` and `n`,
+    /// this function should return a vector of two values, sum and n.
     fn state(&self) -> Result<Vec<AggregateState>>;
 
     /// Updates the accumulator's state from a vector of arrays.
     fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()>;
 
-    /// Retracts an update (caused by the given inputs) to accumulator's state.
-    /// Inverse operation of the `update_batch` operation. This method must be
-    /// for accumulators that should support bounded OVER aggregates.
+    /// Retracts an update (caused by the given inputs) to
+    /// accumulator's state.
+    ///
+    /// This is the inverse operation of [`update_batch`] and is used
+    /// to incrementally calculate window aggregates where the OVER
+    /// clause defines a bounded window.
     fn retract_batch(&mut self, _values: &[ArrayRef]) -> Result<()> {
         // TODO add retract for all accumulators
         Err(DataFusionError::Internal(
@@ -49,10 +62,17 @@ pub trait Accumulator: Send + Sync + Debug {
         ))
     }
 
-    /// updates the accumulator's state from a vector of states.
+    /// Updates the accumulator's state from an `Array` containing one
+    /// or more intermediate values.
+    ///
+    /// The `states` array passed was formed by concatenating the
+    /// results of calling `[state]` on zero or more other accumulator
+    /// instances.
+    ///
+    /// `states`  is an array of the same types as returned by [`state`]
     fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()>;
 
-    /// returns its value based on its current state.
+    /// Returns the final aggregate value based on its current state.
     fn evaluate(&self) -> Result<ScalarValue>;
 
     /// Allocated size required for this accumulator, in bytes, including `Self`.