You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/12/05 17:55:11 UTC
[arrow-datafusion] branch master updated: Minor: Update docstrings and comments to aggregate code (#4489)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 394c5eebf Minor: Update docstrings and comments to aggregate code (#4489)
394c5eebf is described below
commit 394c5eebf0cfb080698485bd32a8d2fb2230fb80
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Mon Dec 5 12:55:05 2022 -0500
Minor: Update docstrings and comments to aggregate code (#4489)
* Minor: add some comments to aggregate code
* Fix typos
Co-authored-by: jakevin <ja...@gmail.com>
* fix typo
Co-authored-by: jakevin <ja...@gmail.com>
Co-authored-by: jakevin <ja...@gmail.com>
---
.../core/src/physical_plan/aggregates/hash.rs | 8 ++++-
datafusion/expr/src/accumulator.rs | 36 +++++++++++++++++-----
2 files changed, 35 insertions(+), 9 deletions(-)
diff --git a/datafusion/core/src/physical_plan/aggregates/hash.rs b/datafusion/core/src/physical_plan/aggregates/hash.rs
index 8bf929630..4ab2a0a06 100644
--- a/datafusion/core/src/physical_plan/aggregates/hash.rs
+++ b/datafusion/core/src/physical_plan/aggregates/hash.rs
@@ -468,6 +468,11 @@ impl std::fmt::Debug for Accumulators {
}
/// Create a RecordBatch with all group keys and accumulator' states or values.
+///
+/// The output looks like
+/// ```text
+/// gby_expr1, gby_expr2, ... agg_1, agg2, ...
+/// ```
fn create_batch_from_map(
mode: &AggregateMode,
accumulators: &Accumulators,
@@ -493,6 +498,7 @@ fn create_batch_from_map(
}
}
+ // First, output all group by exprs
let mut columns = (0..num_group_expr)
.map(|i| {
ScalarValue::iter_to_array(
@@ -504,7 +510,7 @@ fn create_batch_from_map(
})
.collect::<Result<Vec<_>>>()?;
- // add state / evaluated arrays
+ // next, output aggregates: either intermediate state or final output
for (x, &state_len) in acc_data_types.iter().enumerate() {
for y in 0..state_len {
match mode {
diff --git a/datafusion/expr/src/accumulator.rs b/datafusion/expr/src/accumulator.rs
index 3d107c0c4..5b8269ee2 100644
--- a/datafusion/expr/src/accumulator.rs
+++ b/datafusion/expr/src/accumulator.rs
@@ -31,17 +31,30 @@ use std::fmt::Debug;
/// * update its state from multiple accumulators' states via `merge_batch`
/// * compute the final value from its internal state via `evaluate`
pub trait Accumulator: Send + Sync + Debug {
- /// Returns the state of the accumulator at the end of the accumulation.
- /// in the case of an average on which we track `sum` and `n`, this function should return a vector
- /// of two values, sum and n.
+ /// Returns the partial intermediate state of the accumulator. This
+ /// partial state is serialied as `Arrays` and then combined with
+ /// other partial states from different instances of this
+ /// accumulator (that ran on different partitions, for
+ /// example).
+ ///
+ /// The state can be a different type than the output of the
+ /// [`Accumulator`]
+ ///
+ /// See [`merge_batch`] for more details on the merging process.
+ ///
+ /// For example, in the case of an average, for which we track `sum` and `n`,
+ /// this function should return a vector of two values, sum and n.
fn state(&self) -> Result<Vec<AggregateState>>;
/// Updates the accumulator's state from a vector of arrays.
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()>;
- /// Retracts an update (caused by the given inputs) to accumulator's state.
- /// Inverse operation of the `update_batch` operation. This method must be
- /// for accumulators that should support bounded OVER aggregates.
+ /// Retracts an update (caused by the given inputs) to
+ /// accumulator's state.
+ ///
+ /// This is the inverse operation of [`update_batch`] and is used
+ /// to incrementally calculate window aggregates where the OVER
+ /// clause defines a bounded window.
fn retract_batch(&mut self, _values: &[ArrayRef]) -> Result<()> {
// TODO add retract for all accumulators
Err(DataFusionError::Internal(
@@ -49,10 +62,17 @@ pub trait Accumulator: Send + Sync + Debug {
))
}
- /// updates the accumulator's state from a vector of states.
+ /// Updates the accumulator's state from an `Array` containing one
+ /// or more intermediate values.
+ ///
+ /// The `states` array passed was formed by concatenating the
+ /// results of calling `[state]` on zero or more other accumulator
+ /// instances.
+ ///
+ /// `states` is an array of the same types as returned by [`state`]
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()>;
- /// returns its value based on its current state.
+ /// Returns the final aggregate value based on its current state.
fn evaluate(&self) -> Result<ScalarValue>;
/// Allocated size required for this accumulator, in bytes, including `Self`.