You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2023/07/03 07:57:35 UTC

[arrow-datafusion] branch hash_agg_spike created (now bcd9e8ed56)

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch hash_agg_spike
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


      at bcd9e8ed56 WIP count

This branch includes the following new commits:

     new 90b38b0d46 Refactor Decimal128 averaging code to be vectorizable (and easier to read)
     new e02c35d649 POC: Demonstrate new GroupHashAggregate stream approach
     new 337353810d complete accumulator
     new d7c1581fde touchups
     new 814b1fc29b Add comments
     new 82c9205200 Update comments and simplify code
     new 6680636892 factor out accumulate
     new 892e440538 split nullable/non nullable handling
     new 0cfa1bc3c9 Refactor out accumulation in average
     new 79b1bc9041 Move accumulator to their own function
     new c3496ccd6d update more comments
     new 190c43c148 Begin writing tests for accumulate
     new 83ff9cf83e more tets
     new 889345c446 more tests
     new ccfd3ce65f comments
     new 35b413239a Implement fuzz testing
     new bcd9e8ed56 WIP count

The 17 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[arrow-datafusion] 16/17: Implement fuzz testing

Posted by dh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch hash_agg_spike
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 35b413239a1fdad1663231cddc73501748404835
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Sat Jul 1 08:07:16 2023 -0400

    Implement fuzz testing
---
 .../src/aggregate/groups_accumulator/accumulate.rs | 142 +++++++++++++++------
 1 file changed, 102 insertions(+), 40 deletions(-)

diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs
index 8ea22acd8a..879c1a3a66 100644
--- a/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs
+++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs
@@ -159,27 +159,11 @@ mod test {
     use super::*;
 
     use arrow_array::UInt32Array;
+    use rand::{rngs::ThreadRng, Rng};
 
     #[test]
     fn accumulate_no_filter() {
-        let fixture = Fixture::new();
-        let mut accumulated = vec![];
-
-        accumulate_all(
-            &fixture.group_indices,
-            &fixture.values_array(),
-            fixture.opt_filter(),
-            |group_index, value| accumulated.push((group_index, value)),
-        );
-
-        // Should have see all indexes and values in order
-        accumulated
-            .into_iter()
-            .enumerate()
-            .for_each(|(i, (group_index, value))| {
-                assert_eq!(group_index, fixture.group_indices[i]);
-                assert_eq!(value, fixture.values[i]);
-            })
+        Fixture::new().accumulate_all_test()
     }
 
     #[test]
@@ -199,27 +183,7 @@ mod test {
 
     #[test]
     fn accumulate_nullable_no_filter() {
-        let fixture = Fixture::new();
-        let mut accumulated = vec![];
-
-        accumulate_all_nullable(
-            &fixture.group_indices,
-            &fixture.values_with_nulls_array(),
-            fixture.opt_filter(),
-            |group_index, value, is_valid| {
-                let value = if is_valid { Some(value) } else { None };
-                accumulated.push((group_index, value));
-            },
-        );
-
-        // Should have see all indexes and values in order
-        accumulated
-            .into_iter()
-            .enumerate()
-            .for_each(|(i, (group_index, value))| {
-                assert_eq!(group_index, fixture.group_indices[i]);
-                assert_eq!(value, fixture.values_with_nulls[i]);
-            })
+        Fixture::new().accumulate_all_nullable_test()
     }
 
     #[test]
@@ -239,7 +203,31 @@ mod test {
 
     // TODO: filter testing with/without null
 
-    // fuzz testing
+    #[test]
+    fn accumulate_fuzz() {
+        let mut rng = rand::thread_rng();
+        for _ in 0..100 {
+            Fixture::new_random(&mut rng).accumulate_all_test();
+        }
+    }
+
+    #[test]
+    fn accumulate_nullable_fuzz() {
+        let mut rng = rand::thread_rng();
+        let mut nullable_called = false;
+        for _ in 0..100 {
+            let fixture = Fixture::new_random(&mut rng);
+            // sometimes the random generator will create an array
+            // with no nulls so avoid panic'ing in tests
+            if fixture.values_with_nulls.iter().any(|v| v.is_none()) {
+                nullable_called = true;
+                fixture.accumulate_all_nullable_test();
+            } else {
+                fixture.accumulate_all_test();
+            }
+            assert!(nullable_called);
+        }
+    }
 
     /// Values for testing (there are enough values to exercise the 64 bit chunks
     struct Fixture {
@@ -269,6 +257,34 @@ mod test {
             }
         }
 
+        fn new_random(rng: &mut ThreadRng) -> Self {
+            let num_groups: usize = rng.gen_range(0..1000);
+            let group_indices: Vec<usize> = (0..num_groups).map(|_| rng.gen()).collect();
+
+            let values: Vec<u32> = (0..num_groups).map(|_| rng.gen()).collect();
+
+            // random values with random number and location of nulls
+            // random null percentage
+            let null_pct: f32 = rng.gen_range(0.0..1.0);
+            let values_with_nulls: Vec<Option<u32>> = (0..num_groups)
+                .map(|_| {
+                    let is_null = null_pct < rng.gen_range(0.0..1.0);
+                    if is_null {
+                        None
+                    } else {
+                        Some(rng.gen())
+                    }
+                })
+                .collect();
+
+            Self {
+                group_indices,
+                values,
+                values_with_nulls,
+                opt_filter: None,
+            }
+        }
+
         /// returns `Self::values` an Array
         fn values_array(&self) -> UInt32Array {
             UInt32Array::from(self.values.clone())
@@ -282,5 +298,51 @@ mod test {
         fn opt_filter(&self) -> Option<&BooleanArray> {
             self.opt_filter.as_ref()
         }
+
+        // Calls `accumulate_all` with group_indices, values, and
+        // opt_filter and ensures it calls the right values
+        fn accumulate_all_test(&self) {
+            let mut accumulated = vec![];
+            accumulate_all(
+                &self.group_indices,
+                &self.values_array(),
+                self.opt_filter(),
+                |group_index, value| accumulated.push((group_index, value)),
+            );
+
+            // Should have see all indexes and values in order
+            accumulated
+                .into_iter()
+                .enumerate()
+                .for_each(|(i, (group_index, value))| {
+                    assert_eq!(group_index, self.group_indices[i]);
+                    assert_eq!(value, self.values[i]);
+                })
+        }
+
+        // Calls `accumulate_all_nullable` with group_indices, values,
+        // and opt_filter and ensures it calls the right values
+        fn accumulate_all_nullable_test(&self) {
+            let mut accumulated = vec![];
+
+            accumulate_all_nullable(
+                &self.group_indices,
+                &self.values_with_nulls_array(),
+                self.opt_filter(),
+                |group_index, value, is_valid| {
+                    let value = if is_valid { Some(value) } else { None };
+                    accumulated.push((group_index, value));
+                },
+            );
+
+            // Should have see all indexes and values in order
+            accumulated
+                .into_iter()
+                .enumerate()
+                .for_each(|(i, (group_index, value))| {
+                    assert_eq!(group_index, self.group_indices[i]);
+                    assert_eq!(value, self.values_with_nulls[i]);
+                })
+        }
     }
 }


[arrow-datafusion] 15/17: comments

Posted by dh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch hash_agg_spike
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit ccfd3ce65f2418bd6d284f2d0ead5958c051dbd0
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Sat Jul 1 07:16:26 2023 -0400

    comments
---
 .../physical-expr/src/aggregate/groups_accumulator/accumulate.rs      | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs
index 24e4b04661..8ea22acd8a 100644
--- a/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs
+++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs
@@ -74,6 +74,7 @@ pub fn accumulate_all<T, F>(
     T: ArrowNumericType + Send,
     F: FnMut(usize, T::Native) + Send,
 {
+    // Given performance is critical, assert if the wrong flavor is called
     assert_eq!(
         values.null_count(), 0,
         "Called accumulate_all with nullable array (call accumulate_all_nullable instead)"
@@ -108,7 +109,8 @@ pub fn accumulate_all_nullable<T, F>(
     F: FnMut(usize, T::Native, bool) + Send,
 {
     // AAL TODO handle filter values
-    // TODO combine the null mask from values and opt_filter
+
+    // Given performance is critical, assert if the wrong flavor is called
     let valids = values
         .nulls()
         .expect("Called accumulate_all_nullable with non-nullable array (call accumulate_all instead)");


[arrow-datafusion] 05/17: Add comments

Posted by dh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch hash_agg_spike
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 814b1fc29b8b7a30cc1a9786c514f8fe5fe67c5b
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Sat Jul 1 04:59:30 2023 -0400

    Add comments
---
 benchmarks/Cargo.toml                              |   1 +
 benchmarks/src/bin/tpch.rs                         |   3 +
 .../core/src/physical_plan/aggregates/row_hash2.rs | 136 ++++++++++++++++-----
 3 files changed, 107 insertions(+), 33 deletions(-)

diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 1d360739d8..f8ca0b2496 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -37,6 +37,7 @@ arrow = { workspace = true }
 datafusion = { path = "../datafusion/core", version = "27.0.0" }
 env_logger = "0.10"
 futures = "0.3"
+log = "^0.4"
 mimalloc = { version = "0.1", optional = true, default-features = false }
 num_cpus = "1.13.0"
 parquet = { workspace = true }
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 32359dc1f8..d18dd38b9b 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -16,7 +16,9 @@
 // under the License.
 
 //! Benchmark derived from TPC-H. This is not an official TPC-H benchmark.
+use log::info;
 
+use arrow::util::pretty::pretty_format_batches;
 use datafusion::datasource::file_format::{csv::CsvFormat, FileFormat};
 use datafusion::datasource::{MemTable, TableProvider};
 use datafusion::error::{DataFusionError, Result};
@@ -235,6 +237,7 @@ async fn benchmark_query(
         let elapsed = start.elapsed(); //.as_secs_f64() * 1000.0;
         let ms = elapsed.as_secs_f64() * 1000.0;
         millis.push(ms);
+        info!("output:\n\n{}\n\n", pretty_format_batches(&result)?);
         let row_count = result.iter().map(|b| b.num_rows()).sum();
         println!(
             "Query {query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows"
diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash2.rs b/datafusion/core/src/physical_plan/aggregates/row_hash2.rs
index 2eb058d8c5..3e9dbfe0cf 100644
--- a/datafusion/core/src/physical_plan/aggregates/row_hash2.rs
+++ b/datafusion/core/src/physical_plan/aggregates/row_hash2.rs
@@ -58,66 +58,136 @@ pub(crate) enum ExecutionState {
 
 use super::AggregateExec;
 
-/// Grouping aggregate
+/// Hash based Grouping Aggregator
 ///
-/// For each aggregation entry, we use:
-/// - [Arrow-row] represents grouping keys for fast hash computation and comparison directly on raw bytes.
-/// - [GroupsAccumulator] to store per group aggregates
+/// # Design Goals
 ///
-/// The architecture is the following:
+/// This structure is designed so that much can be vectorized (done in
+/// a tight loop) as possible
 ///
-/// TODO
+/// # Architecture
 ///
-/// [WordAligned]: datafusion_row::layout
+/// ```text
+///
+/// stores "group       stores group values,       internally stores aggregate
+///    indexes"          in arrow_row format         values, for all groups
+///
+/// ┌─────────────┐      ┌────────────┐    ┌──────────────┐       ┌──────────────┐
+/// │   ┌─────┐   │      │ ┌────────┐ │    │┌────────────┐│       │┌────────────┐│
+/// │   │  5  │   │ ┌────┼▶│  "A"   │ │    ││accumulator ││       ││accumulator ││
+/// │   ├─────┤   │ │    │ ├────────┤ │    ││     0      ││       ││     N      ││
+/// │   │  9  │   │ │    │ │  "Z"   │ │    ││ ┌────────┐ ││       ││ ┌────────┐ ││
+/// │   └─────┘   │ │    │ └────────┘ │    ││ │ state  │ ││       ││ │ state  │ ││
+/// │     ...     │ │    │            │    ││ │┌─────┐ │ ││  ...  ││ │┌─────┐ │ ││
+/// │   ┌─────┐   │ │    │    ...     │    ││ │├─────┤ │ ││       ││ │├─────┤ │ ││
+/// │   │  1  │───┼─┘    │            │    ││ │└─────┘ │ ││       ││ │└─────┘ │ ││
+/// │   ├─────┤   │      │            │    ││ │        │ ││       ││ │        │ ││
+/// │   │ 13  │───┼─┐    │ ┌────────┐ │    ││ │  ...   │ ││       ││ │  ...   │ ││
+/// │   └─────┘   │ └────┼▶│  "Q"   │ │    ││ │        │ ││       ││ │        │ ││
+/// └─────────────┘      │ └────────┘ │    ││ │┌─────┐ │ ││       ││ │┌─────┐ │ ││
+///                      │            │    ││ │└─────┘ │ ││       ││ │└─────┘ │ ││
+///                      └────────────┘    ││ └────────┘ ││       ││ └────────┘ ││
+///                                        │└────────────┘│       │└────────────┘│
+///                                        └──────────────┘       └──────────────┘
+///
+///       map            group_values                   accumulators
+///  (Hash Table)
+///
+///  ```
+///
+/// For example, given a query like `COUNT(x), SUM(y) ... GROUP BY z`,
+/// `group_values` will store the distinct values of `z`. There will
+/// be one accumulator for `COUNT(x)`, specialized for the data type
+/// of `x` and one accumulator for `SUM(y)`, specialized for the data
+/// type of `y`.
+///
+/// # Description
+///
+/// The hash table stores "group indices", one for each (distinct)
+/// group value.
+///
+/// The group values are stored in [`Self::group_values`] at the
+/// corresponding group index.
+///
+/// The accumulator state (e.g partial sums) is managed by and stored
+/// by a [`GroupsAccumulator`] accumulator. There is one accumulator
+/// per aggregate expression (COUNT, AVG, etc) in the
+/// query. Internally, each `GroupsAccumulator` manages the state for
+/// multiple groups, and is passed `group_indexes` during update. Note
+/// The accumulator state is not managed by this operator (e.g in the
+/// hash table).
 pub(crate) struct GroupedHashAggregateStream2 {
     schema: SchemaRef,
     input: SendableRecordBatchStream,
     mode: AggregateMode,
 
     /// Accumulators, one for each `AggregateExpr` in the query
+    ///
+    /// For example, if the query has aggregates, `SUM(x)`,
+    /// `COUNT(y)`, there will be two accumulators, each one
+    /// specialized for that partcular aggregate and its input types
     accumulators: Vec<Box<dyn GroupsAccumulator>>,
-    /// Arguments expressionf or each accumulator
+
+    /// Arguments or each accumulator.
     aggregate_arguments: Vec<Vec<Arc<dyn PhysicalExpr>>>,
-    /// Filter expression to evaluate for each aggregate
+
+    /// Optional filter expression to evaluate, one for each for
+    /// aggregate. If present, only those rows for which the filter
+    /// evaluate to true should be included in the aggregate results.
+    ///
+    /// For example, for an aggregate like `SUM(x FILTER x > 100)`,
+    /// the filter expression is  `x > 100`.
     filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
 
     /// Converter for each row
     row_converter: RowConverter,
+
+    /// GROUP BY expressions
     group_by: PhysicalGroupBy,
 
     /// The memory reservation for this grouping
     reservation: MemoryReservation,
 
-    /// Logically maps group values to a group_index `group_states`
+    /// Logically maps group values to a group_index in
+    /// [`Self::group_values`] and in each accumulator
     ///
     /// Uses the raw API of hashbrown to avoid actually storing the
-    /// keys in the table
+    /// keys (group values) in the table
     ///
     /// keys: u64 hashes of the GroupValue
-    /// values: (hash, index into `group_states`)
+    /// values: (hash, group_index)
     map: RawTable<(u64, usize)>,
 
-    /// The actual group by values, stored in arrow Row format
-    /// the index of group_by_values is the index
-    /// https://github.com/apache/arrow-rs/issues/4466
-    group_by_values: Vec<OwnedRow>,
+    /// The actual group by values, stored in arrow [`Row`] format. The
+    /// group_values[i] holds the group value for group_index `i`.
+    ///
+    /// The row format is used to compare group keys quickly. This is
+    /// especially important for multi-column group keys.
+    ///
+    /// TODO, make this Rows (rather than Vec<OwnedRow> to reduce
+    /// allocations once
+    /// https://github.com/apache/arrow-rs/issues/4466 is available
+    group_values: Vec<OwnedRow>,
 
-    /// scratch space for the current Batch / Aggregate being
-    /// processed. Saved here to avoid reallocations
+    /// scratch space for the current input Batch being
+    /// processed. Reused across batches here to avoid reallocations
     current_group_indices: Vec<usize>,
 
-    /// generating input/output?
+    /// Tracks if this stream is generating input/output?
     exec_state: ExecutionState,
 
+    /// Execution metrics
     baseline_metrics: BaselineMetrics,
 
+    /// Random state for creating hashes
     random_state: RandomState,
-    /// size to be used for resulting RecordBatches
+
+    /// max rows in output RecordBatches
     batch_size: usize,
 }
 
 impl GroupedHashAggregateStream2 {
-    /// Create a new GroupedHashAggregateStream
+    /// Create a new GroupedHashAggregateStream2
     pub fn new(
         agg: &AggregateExec,
         context: Arc<TaskContext>,
@@ -137,15 +207,14 @@ impl GroupedHashAggregateStream2 {
         let mut aggregate_exprs = vec![];
         let mut aggregate_arguments = vec![];
 
-        // The expressions to evaluate the batch, one vec of expressions per aggregation.
-        // Assuming create_schema() always puts group columns in front of aggregation columns, we set
-        // col_idx_base to the group expression count.
-
+        // The arguments for each aggregate, one vec of expressions
+        // per aggregation.
         let all_aggregate_expressions = aggregates::aggregate_expressions(
             &agg.aggr_expr,
             &agg.mode,
             agg_group_by.expr.len(),
         )?;
+
         let filter_expressions = match agg.mode {
             AggregateMode::Partial | AggregateMode::Single => agg_filter_expr,
             AggregateMode::Final | AggregateMode::FinalPartitioned => {
@@ -194,7 +263,7 @@ impl GroupedHashAggregateStream2 {
             group_by: agg_group_by,
             reservation,
             map,
-            group_by_values,
+            group_values: group_by_values,
             current_group_indices,
             exec_state,
             baseline_metrics,
@@ -204,7 +273,8 @@ impl GroupedHashAggregateStream2 {
     }
 }
 
-/// Crate a `GroupsAccumulator` for each of the aggregate_exprs to hold the aggregation state
+/// Crate a [`GroupsAccumulator`] for each of the aggregate_exprs to
+/// hold the aggregation state
 fn create_accumulators(
     aggregate_exprs: Vec<Arc<dyn AggregateExpr>>,
 ) -> Result<Vec<Box<dyn GroupsAccumulator>>> {
@@ -326,7 +396,7 @@ impl GroupedHashAggregateStream2 {
                 // TODO update *allocated based on size of the row
                 // that was just pushed into
                 // aggr_state.group_by_values
-                group_rows.row(row) == self.group_by_values[*group_idx].row()
+                group_rows.row(row) == self.group_values[*group_idx].row()
             });
 
             let group_idx = match entry {
@@ -335,8 +405,8 @@ impl GroupedHashAggregateStream2 {
                 //  1.2 Need to create new entry for the group
                 None => {
                     // Add new entry to aggr_state and save newly created index
-                    let group_idx = self.group_by_values.len();
-                    self.group_by_values.push(group_rows.row(row).owned());
+                    let group_idx = self.group_values.len();
+                    self.group_values.push(group_rows.row(row).owned());
 
                     // for hasher function, use precomputed hash value
                     self.map.insert_accounted(
@@ -382,7 +452,7 @@ impl GroupedHashAggregateStream2 {
                 .zip(input_values.iter())
                 .zip(filter_values.iter());
 
-            let total_num_groups = self.group_by_values.len();
+            let total_num_groups = self.group_values.len();
 
             for ((acc, values), opt_filter) in t {
                 let acc_size_pre = acc.size();
@@ -424,13 +494,13 @@ impl GroupedHashAggregateStream2 {
 impl GroupedHashAggregateStream2 {
     /// Create an output RecordBatch with all group keys and accumulator states/values
     fn create_batch_from_map(&mut self) -> Result<RecordBatch> {
-        if self.group_by_values.is_empty() {
+        if self.group_values.is_empty() {
             let schema = self.schema.clone();
             return Ok(RecordBatch::new_empty(schema));
         }
 
         // First output rows are the groups
-        let groups_rows = self.group_by_values.iter().map(|owned_row| owned_row.row());
+        let groups_rows = self.group_values.iter().map(|owned_row| owned_row.row());
 
         let mut output: Vec<ArrayRef> = self.row_converter.convert_rows(groups_rows)?;
 


[arrow-datafusion] 08/17: split nullable/non nullable handling

Posted by dh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch hash_agg_spike
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 892e440538b984877b82edcc96b4e2e5f80e70dd
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Sat Jul 1 05:46:52 2023 -0400

    split nullable/non nullable handling
---
 datafusion/physical-expr/src/aggregate/average.rs | 139 ++++++++++++----------
 1 file changed, 79 insertions(+), 60 deletions(-)

diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs
index 0dcff7ec9b..20ccadd7e8 100644
--- a/datafusion/physical-expr/src/aggregate/average.rs
+++ b/datafusion/physical-expr/src/aggregate/average.rs
@@ -417,9 +417,13 @@ impl RowAccumulator for AvgRowAccumulator {
     }
 }
 
-/// This function is called once per row to update the accumulator,
-/// for a `PrimitiveArray<T>` and is the inner loop for many
-/// GroupsAccumulators and thus performance critical.
+/// This function is called to update the accumulator state per row,
+/// for a `PrimitiveArray<T>` with no nulls. It is the inner loop for
+/// many GroupsAccumulators and thus performance critical.
+///
+/// I couldn't find any way to combine this with
+/// accumulate_all_nullable without having to pass in a is_null on
+/// every row.
 ///
 /// * `values`: the input arguments to the accumulator
 /// * `group_indices`:  To which groups do the rows in `values` belong, group id)
@@ -427,80 +431,95 @@ impl RowAccumulator for AvgRowAccumulator {
 ///
 /// `F`: The function to invoke for a non null input row to update the
 /// accumulator state. Called like `value_fn(group_index, value)
-///
-/// `FN`: The function to call for each null input row.  Called like
-/// `null_fn(group_index)
 fn accumulate_all<T, F, FN>(
     values: &PrimitiveArray<T>,
     group_indicies: &[usize],
     opt_filter: Option<&arrow_array::BooleanArray>,
     value_fn: F,
-    null_fn: FN,
 ) where
     T: ArrowNumericType + Send,
     F: Fn(usize, T::Native) + Send,
-    FN: Fn(usize) + Send,
 {
+    assert_eq!(
+        values.null_count(), 0,
+        "Called accumulate_all with nullable array (call accumulate_all_nullable instead)"
+    );
+
     // AAL TODO handle filter values
+
+    let data: &[T::Native] = values.values();
+    let iter = group_indicies.iter().zip(data.iter());
+    for (&group_index, &new_value) in iter {
+        value_fn(group_index, new_value)
+    }
+}
+
+
+/// This function is called to update the accumulator state per row,
+/// for a `PrimitiveArray<T>` with no nulls. It is the inner loop for
+/// many GroupsAccumulators and thus performance critical.
+///
+/// * `values`: the input arguments to the accumulator
+/// * `group_indices`:  To which groups do the rows in `values` belong, group id)
+/// * `opt_filter`: if present, only update aggregate state using values[i] if opt_filter[i] is true
+///
+/// `F`: The function to invoke for an input row to update the
+/// accumulator state. Called like `value_fn(group_index, value,
+/// is_valid). NOTE the parameter is true when the value is VALID.
+fn accumulate_all_nullable<T, F, FN>(
+    values: &PrimitiveArray<T>,
+    group_indicies: &[usize],
+    opt_filter: Option<&arrow_array::BooleanArray>,
+    value_fn: F,
+) where
+    T: ArrowNumericType + Send,
+    F: Fn(usize, T::Native, bool) + Send,
+{
+     // AAL TODO handle filter values
     // TODO combine the null mask from values and opt_filter
-    let valids = values.nulls();
+    let valids = values
+        .nulls()
+        .expect("Called accumulate_all_nullable with non-nullable array (call accumulate_all instead)");
 
     // This is based on (ahem, COPY/PASTA) arrow::compute::aggregate::sum
     let data: &[T::Native] = values.values();
 
-    match valids {
-        // no nulls
-        None => {
-            let iter = group_indicies.iter().zip(data.iter());
-            for (&group_index, &new_value) in iter {
-                value_fn(group_index, new_value)
-            }
-        }
-        // there are nulls, so handle them specially
-        Some(valids) => {
-            let group_indices_chunks = group_indicies.chunks_exact(64);
-            let data_chunks = data.chunks_exact(64);
-            let bit_chunks = valids.inner().bit_chunks();
-
-            let group_indices_remainder = group_indices_chunks.remainder();
-            let data_remainder = data_chunks.remainder();
-
-            group_indices_chunks
-                .zip(data_chunks)
-                .zip(bit_chunks.iter())
-                .for_each(|((group_index_chunk, data_chunk), mask)| {
-                    // index_mask has value 1 << i in the loop
-                    let mut index_mask = 1;
-                    group_index_chunk.iter().zip(data_chunk.iter()).for_each(
-                        |(&group_index, &new_value)| {
-                            // valid bit was set, real vale
-                            if (mask & index_mask) != 0 {
-                                value_fn(group_index, new_value);
-                            } else {
-                                null_fn(group_index)
-                            }
-                            index_mask <<= 1;
-                        },
-                    )
-                });
-
-            // handle any remaining bits (after the intial 64)
-            let remainder_bits = bit_chunks.remainder_bits();
-            group_indices_remainder
-                .iter()
-                .zip(data_remainder.iter())
-                .enumerate()
-                .for_each(|(i, (&group_index, &new_value))| {
-                    if remainder_bits & (1 << i) != 0 {
-                        value_fn(group_index, new_value)
-                    } else {
-                        null_fn(group_index)
-                    }
-                });
-        }
-    }
+    let group_indices_chunks = group_indicies.chunks_exact(64);
+    let data_chunks = data.chunks_exact(64);
+    let bit_chunks = valids.inner().bit_chunks();
+
+    let group_indices_remainder = group_indices_chunks.remainder();
+    let data_remainder = data_chunks.remainder();
+
+    group_indices_chunks
+        .zip(data_chunks)
+        .zip(bit_chunks.iter())
+        .for_each(|((group_index_chunk, data_chunk), mask)| {
+            // index_mask has value 1 << i in the loop
+            let mut index_mask = 1;
+            group_index_chunk.iter().zip(data_chunk.iter()).for_each(
+                |(&group_index, &new_value)| {
+                    // valid bit was set, real vale
+                    let is_valid = (mask & index_mask) != 0;
+                    value_fn(group_index, new_value, is_valid);
+                    index_mask <<= 1;
+                },
+            )
+        });
+
+    // handle any remaining bits (after the intial 64)
+    let remainder_bits = bit_chunks.remainder_bits();
+    group_indices_remainder
+        .iter()
+        .zip(data_remainder.iter())
+        .enumerate()
+        .for_each(|(i, (&group_index, &new_value))| {
+            let is_valid = remainder_bits & (1 << i) != 0;
+            value_fn(group_index, new_value, is_valid)
+        });
 }
 
+
 /// An accumulator to compute the average of PrimitiveArray<T>.
 /// Stores values as native types, and does overflow checking
 ///


[arrow-datafusion] 04/17: touchups

Posted by dh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch hash_agg_spike
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit d7c1581fdee87a6962b5b4167c74272bd472b04a
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Fri Jun 30 12:56:51 2023 -0400

    touchups
---
 datafusion/core/src/physical_plan/aggregates/row_hash2.rs |  6 +++---
 datafusion/physical-expr/src/aggregate/average.rs         | 10 +++++++---
 2 files changed, 10 insertions(+), 6 deletions(-)

diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash2.rs b/datafusion/core/src/physical_plan/aggregates/row_hash2.rs
index 90e7cd0724..2eb058d8c5 100644
--- a/datafusion/core/src/physical_plan/aggregates/row_hash2.rs
+++ b/datafusion/core/src/physical_plan/aggregates/row_hash2.rs
@@ -20,7 +20,7 @@
 //! POC demonstration of GroupByHashApproach
 
 use datafusion_physical_expr::GroupsAccumulator;
-use log::info;
+use log::debug;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 use std::vec;
@@ -123,7 +123,7 @@ impl GroupedHashAggregateStream2 {
         context: Arc<TaskContext>,
         partition: usize,
     ) -> Result<Self> {
-        info!("Creating GroupedHashAggregateStream2");
+        debug!("Creating GroupedHashAggregateStream2");
         let agg_schema = Arc::clone(&agg.schema);
         let agg_group_by = agg.group_by.clone();
         let agg_filter_expr = agg.filter_expr.clone();
@@ -208,7 +208,7 @@ impl GroupedHashAggregateStream2 {
 fn create_accumulators(
     aggregate_exprs: Vec<Arc<dyn AggregateExpr>>,
 ) -> Result<Vec<Box<dyn GroupsAccumulator>>> {
-    info!("Creating accumulator for {aggregate_exprs:#?}");
+    debug!("Creating accumulator for {aggregate_exprs:#?}");
     aggregate_exprs
         .into_iter()
         .map(|agg_expr| agg_expr.create_groups_accumulator())
diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs
index b23b555805..7043ed9ce1 100644
--- a/datafusion/physical-expr/src/aggregate/average.rs
+++ b/datafusion/physical-expr/src/aggregate/average.rs
@@ -483,7 +483,8 @@ where
             None => {
                 let iter = group_indicies.iter().zip(data.iter());
                 for (group_index, new_value) in iter {
-                    self.sums[*group_index].add_wrapping(*new_value);
+                    let sum = &mut self.sums[*group_index];
+                    *sum = sum.add_wrapping(*new_value);
                 }
             }
             //
@@ -504,7 +505,8 @@ where
                         group_index_chunk.iter().zip(data_chunk.iter()).for_each(
                             |(group_index, new_value)| {
                                 if (mask & index_mask) != 0 {
-                                    self.sums[*group_index].add_wrapping(*new_value);
+                                    let sum = &mut self.sums[*group_index];
+                                    *sum = sum.add_wrapping(*new_value);
                                 }
                                 index_mask <<= 1;
                             },
@@ -518,7 +520,8 @@ where
                     .enumerate()
                     .for_each(|(i, (group_index, new_value))| {
                         if remainder_bits & (1 << i) != 0 {
-                            self.sums[*group_index].add_wrapping(*new_value);
+                            let sum = &mut self.sums[*group_index];
+                            *sum = sum.add_wrapping(*new_value);
                         }
                     });
             }
@@ -550,6 +553,7 @@ where
 
         // update values
         self.update_sums(values, group_indicies, opt_filter, total_num_groups)?;
+
         Ok(())
     }
 


[arrow-datafusion] 09/17: Refactor out accumulation in average

Posted by dh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch hash_agg_spike
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 0cfa1bc3c9c17d34f2a5eaf22b4269d31da100d3
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Sat Jul 1 06:07:44 2023 -0400

    Refactor out accumulation in average
---
 datafusion/physical-expr/src/aggregate/average.rs | 189 ++++++++++++----------
 1 file changed, 105 insertions(+), 84 deletions(-)

diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs
index 20ccadd7e8..2d9a627a5f 100644
--- a/datafusion/physical-expr/src/aggregate/average.rs
+++ b/datafusion/physical-expr/src/aggregate/average.rs
@@ -431,14 +431,14 @@ impl RowAccumulator for AvgRowAccumulator {
 ///
 /// `F`: The function to invoke for a non null input row to update the
 /// accumulator state. Called like `value_fn(group_index, value)
-fn accumulate_all<T, F, FN>(
+fn accumulate_all<T, F>(
     values: &PrimitiveArray<T>,
     group_indicies: &[usize],
     opt_filter: Option<&arrow_array::BooleanArray>,
-    value_fn: F,
+    mut value_fn: F,
 ) where
     T: ArrowNumericType + Send,
-    F: Fn(usize, T::Native) + Send,
+    F: FnMut(usize, T::Native) + Send,
 {
     assert_eq!(
         values.null_count(), 0,
@@ -454,7 +454,6 @@ fn accumulate_all<T, F, FN>(
     }
 }
 
-
 /// This function is called to update the accumulator state per row,
 /// for a `PrimitiveArray<T>` with no nulls. It is the inner loop for
 /// many GroupsAccumulators and thus performance critical.
@@ -466,16 +465,16 @@ fn accumulate_all<T, F, FN>(
 /// `F`: The function to invoke for an input row to update the
 /// accumulator state. Called like `value_fn(group_index, value,
 /// is_valid). NOTE the parameter is true when the value is VALID.
-fn accumulate_all_nullable<T, F, FN>(
+fn accumulate_all_nullable<T, F>(
     values: &PrimitiveArray<T>,
     group_indicies: &[usize],
     opt_filter: Option<&arrow_array::BooleanArray>,
-    value_fn: F,
+    mut value_fn: F,
 ) where
     T: ArrowNumericType + Send,
-    F: Fn(usize, T::Native, bool) + Send,
+    F: FnMut(usize, T::Native, bool) + Send,
 {
-     // AAL TODO handle filter values
+    // AAL TODO handle filter values
     // TODO combine the null mask from values and opt_filter
     let valids = values
         .nulls()
@@ -519,7 +518,6 @@ fn accumulate_all_nullable<T, F, FN>(
         });
 }
 
-
 /// An accumulator to compute the average of PrimitiveArray<T>.
 /// Stores values as native types, and does overflow checking
 ///
@@ -566,6 +564,72 @@ where
         }
     }
 
+    /// Adds one to each group's counter
+    fn increment_counts(
+        &mut self,
+        values: &PrimitiveArray<T>,
+        group_indicies: &[usize],
+        opt_filter: Option<&arrow_array::BooleanArray>,
+        total_num_groups: usize,
+    ) {
+        self.counts.resize(total_num_groups, 0);
+
+        if values.null_count() == 0 {
+            accumulate_all(
+                values,
+                group_indicies,
+                opt_filter,
+                |group_index, _new_value| {
+                    self.counts[group_index] += 1;
+                },
+            )
+        } else {
+            accumulate_all_nullable(
+                values,
+                group_indicies,
+                opt_filter,
+                |group_index, _new_value, is_valid| {
+                    if is_valid {
+                        self.counts[group_index] += 1;
+                    }
+                },
+            )
+        }
+    }
+
+    /// Adds the counts with the partial counts
+    fn update_counts_with_partial_counts(
+        &mut self,
+        partial_counts: &UInt64Array,
+        group_indicies: &[usize],
+        opt_filter: Option<&arrow_array::BooleanArray>,
+        total_num_groups: usize,
+    ) {
+        self.counts.resize(total_num_groups, 0);
+
+        if partial_counts.null_count() == 0 {
+            accumulate_all(
+                partial_counts,
+                group_indicies,
+                opt_filter,
+                |group_index, partial_count| {
+                    self.counts[group_index] += partial_count;
+                },
+            )
+        } else {
+            accumulate_all_nullable(
+                partial_counts,
+                group_indicies,
+                opt_filter,
+                |group_index, partial_count, is_valid| {
+                    if is_valid {
+                        self.counts[group_index] += partial_count;
+                    }
+                },
+            )
+        }
+    }
+
     /// Adds the values in `values` to self.sums
     fn update_sums(
         &mut self,
@@ -573,66 +637,33 @@ where
         group_indicies: &[usize],
         opt_filter: Option<&arrow_array::BooleanArray>,
         total_num_groups: usize,
-    ) -> Result<()> {
+    ) {
         self.sums
             .resize_with(total_num_groups, || T::default_value());
 
-        // AAL TODO
-        // TODO combine the null mask from values and opt_filter
-        let valids = values.nulls();
-
-        // This is based on (ahem, COPY/PASTA) arrow::compute::aggregate::sum
-        let data: &[T::Native] = values.values();
-
-        match valids {
-            // use all values in group_index
-            None => {
-                let iter = group_indicies.iter().zip(data.iter());
-                for (group_index, new_value) in iter {
-                    let sum = &mut self.sums[*group_index];
-                    *sum = sum.add_wrapping(*new_value);
-                }
-            }
-            //
-            Some(valids) => {
-                let group_indices_chunks = group_indicies.chunks_exact(64);
-                let data_chunks = data.chunks_exact(64);
-                let bit_chunks = valids.inner().bit_chunks();
-
-                let group_indices_remainder = group_indices_chunks.remainder();
-                let data_remainder = data_chunks.remainder();
-
-                group_indices_chunks
-                    .zip(data_chunks)
-                    .zip(bit_chunks.iter())
-                    .for_each(|((group_index_chunk, data_chunk), mask)| {
-                        // index_mask has value 1 << i in the loop
-                        let mut index_mask = 1;
-                        group_index_chunk.iter().zip(data_chunk.iter()).for_each(
-                            |(group_index, new_value)| {
-                                if (mask & index_mask) != 0 {
-                                    let sum = &mut self.sums[*group_index];
-                                    *sum = sum.add_wrapping(*new_value);
-                                }
-                                index_mask <<= 1;
-                            },
-                        )
-                    });
-
-                let remainder_bits = bit_chunks.remainder_bits();
-                group_indices_remainder
-                    .iter()
-                    .zip(data_remainder.iter())
-                    .enumerate()
-                    .for_each(|(i, (group_index, new_value))| {
-                        if remainder_bits & (1 << i) != 0 {
-                            let sum = &mut self.sums[*group_index];
-                            *sum = sum.add_wrapping(*new_value);
-                        }
-                    });
-            }
+        if values.null_count() == 0 {
+            accumulate_all(
+                values,
+                group_indicies,
+                opt_filter,
+                |group_index, new_value| {
+                    let sum = &mut self.sums[group_index];
+                    *sum = sum.add_wrapping(new_value);
+                },
+            )
+        } else {
+            accumulate_all_nullable(
+                values,
+                group_indicies,
+                opt_filter,
+                |group_index, new_value, is_valid| {
+                    if is_valid {
+                        let sum = &mut self.sums[group_index];
+                        *sum = sum.add_wrapping(new_value);
+                    }
+                },
+            )
         }
-        Ok(())
     }
 }
 
@@ -651,14 +682,8 @@ where
         assert_eq!(values.len(), 1, "single argument to update_batch");
         let values = values.get(0).unwrap().as_primitive::<T>();
 
-        // update counts (TOD account for opt_filter)
-        self.counts.resize(total_num_groups, 0);
-        group_indicies.iter().for_each(|&group_idx| {
-            self.counts[group_idx] += 1;
-        });
-
-        // update values
-        self.update_sums(values, group_indicies, opt_filter, total_num_groups)?;
+        self.increment_counts(values, group_indicies, opt_filter, total_num_groups);
+        self.update_sums(values, group_indicies, opt_filter, total_num_groups);
 
         Ok(())
     }
@@ -672,19 +697,15 @@ where
     ) -> Result<()> {
         assert_eq!(values.len(), 2, "two arguments to merge_batch");
         // first batch is counts, second is partial sums
-        let counts = values.get(0).unwrap().as_primitive::<UInt64Type>();
+        let partial_counts = values.get(0).unwrap().as_primitive::<UInt64Type>();
         let partial_sums = values.get(1).unwrap().as_primitive::<T>();
-
-        // update counts by summing the partial sums (TODO account for opt_filter)
-        self.counts.resize(total_num_groups, 0);
-        group_indicies.iter().zip(counts.values().iter()).for_each(
-            |(&group_idx, &count)| {
-                self.counts[group_idx] += count;
-            },
+        self.update_counts_with_partial_counts(
+            partial_counts,
+            group_indicies,
+            opt_filter,
+            total_num_groups,
         );
-
-        // update values
-        self.update_sums(partial_sums, group_indicies, opt_filter, total_num_groups)?;
+        self.update_sums(partial_sums, group_indicies, opt_filter, total_num_groups);
 
         Ok(())
     }


[arrow-datafusion] 02/17: POC: Demonstrate new GroupHashAggregate stream approach

Posted by dh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch hash_agg_spike
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit e02c35d649ea999b4a79f44897b31eab8ce606db
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Thu Jun 29 09:42:13 2023 -0400

    POC: Demonstrate new GroupHashAggregate stream approach
---
 .../core/src/physical_plan/aggregates/mod.rs       |  18 +-
 .../core/src/physical_plan/aggregates/row_hash.rs  |   2 +
 .../core/src/physical_plan/aggregates/row_hash2.rs | 449 +++++++++++++++++++++
 datafusion/physical-expr/Cargo.toml                |   1 +
 datafusion/physical-expr/src/aggregate/average.rs  | 218 +++++++++-
 .../src/aggregate/groups_accumulator.rs            | 100 +++++
 datafusion/physical-expr/src/aggregate/mod.rs      |  15 +
 datafusion/physical-expr/src/lib.rs                |   2 +
 8 files changed, 801 insertions(+), 4 deletions(-)

diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs
index 343f7628b7..e086b545b8 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -49,6 +49,7 @@ use std::sync::Arc;
 mod bounded_aggregate_stream;
 mod no_grouping;
 mod row_hash;
+mod row_hash2;
 mod utils;
 
 pub use datafusion_expr::AggregateFunction;
@@ -58,6 +59,8 @@ use datafusion_physical_expr::utils::{
     get_finer_ordering, ordering_satisfy_requirement_concrete,
 };
 
+use self::row_hash2::GroupedHashAggregateStream2;
+
 /// Hash aggregate modes
 #[derive(Debug, Copy, Clone, PartialEq, Eq)]
 pub enum AggregateMode {
@@ -196,6 +199,7 @@ impl PartialEq for PhysicalGroupBy {
 enum StreamType {
     AggregateStream(AggregateStream),
     GroupedHashAggregateStream(GroupedHashAggregateStream),
+    GroupedHashAggregateStream2(GroupedHashAggregateStream2),
     BoundedAggregate(BoundedAggregateStream),
 }
 
@@ -204,6 +208,7 @@ impl From<StreamType> for SendableRecordBatchStream {
         match stream {
             StreamType::AggregateStream(stream) => Box::pin(stream),
             StreamType::GroupedHashAggregateStream(stream) => Box::pin(stream),
+            StreamType::GroupedHashAggregateStream2(stream) => Box::pin(stream),
             StreamType::BoundedAggregate(stream) => Box::pin(stream),
         }
     }
@@ -711,12 +716,23 @@ impl AggregateExec {
                 partition,
                 aggregation_ordering,
             )?))
+        } else if self.use_poc_group_by() {
+            Ok(StreamType::GroupedHashAggregateStream2(
+                GroupedHashAggregateStream2::new(self, context, partition)?,
+            ))
         } else {
             Ok(StreamType::GroupedHashAggregateStream(
                 GroupedHashAggregateStream::new(self, context, partition)?,
             ))
         }
     }
+
+    /// Returns true if we should use the POC group by stream
+    /// TODO: check for actually supported aggregates, etc
+    fn use_poc_group_by(&self) -> bool {
+        //info!("AAL Checking POC group by: {self:#?}");
+        true
+    }
 }
 
 impl ExecutionPlan for AggregateExec {
@@ -980,7 +996,7 @@ fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef {
     Arc::new(Schema::new(group_fields))
 }
 
-/// returns physical expressions to evaluate against a batch
+/// returns physical expressions for arguments to evaluate against a batch
 /// The expressions are different depending on `mode`:
 /// * Partial: AggregateExpr::expressions
 /// * Final: columns of `AggregateExpr::state_fields()`
diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
index ba02bc096b..5742c17c1d 100644
--- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs
+++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
@@ -17,6 +17,7 @@
 
 //! Hash aggregation through row format
 
+use log::info;
 use std::cmp::min;
 use std::ops::Range;
 use std::sync::Arc;
@@ -119,6 +120,7 @@ impl GroupedHashAggregateStream {
         context: Arc<TaskContext>,
         partition: usize,
     ) -> Result<Self> {
+        info!("Creating GroupedHashAggregateStream");
         let agg_schema = Arc::clone(&agg.schema);
         let agg_group_by = agg.group_by.clone();
         let agg_filter_expr = agg.filter_expr.clone();
diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash2.rs b/datafusion/core/src/physical_plan/aggregates/row_hash2.rs
new file mode 100644
index 0000000000..90e7cd0724
--- /dev/null
+++ b/datafusion/core/src/physical_plan/aggregates/row_hash2.rs
@@ -0,0 +1,449 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Hash aggregation through row format
+//!
+//! POC demonstration of GroupByHashApproach
+
+use datafusion_physical_expr::GroupsAccumulator;
+use log::info;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::vec;
+
+use ahash::RandomState;
+use arrow::row::{OwnedRow, RowConverter, SortField};
+use datafusion_physical_expr::hash_utils::create_hashes;
+use futures::ready;
+use futures::stream::{Stream, StreamExt};
+
+use crate::physical_plan::aggregates::{
+    evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AggregateMode,
+    PhysicalGroupBy,
+};
+use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
+use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
+use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
+use arrow::array::*;
+use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
+use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
+use datafusion_execution::TaskContext;
+use hashbrown::raw::RawTable;
+
+#[derive(Debug, Clone)]
+/// This object tracks the aggregation phase (input/output)
+pub(crate) enum ExecutionState {
+    ReadingInput,
+    /// When producing output, the remaining rows to output are stored
+    /// here and are sliced off as needed in batch_size chunks
+    ProducingOutput(RecordBatch),
+    Done,
+}
+
+use super::AggregateExec;
+
+/// Grouping aggregate
+///
+/// For each aggregation entry, we use:
+/// - [Arrow-row] represents grouping keys for fast hash computation and comparison directly on raw bytes.
+/// - [GroupsAccumulator] to store per group aggregates
+///
+/// The architecture is the following:
+///
+/// TODO
+///
+/// [WordAligned]: datafusion_row::layout
+pub(crate) struct GroupedHashAggregateStream2 {
+    schema: SchemaRef,
+    input: SendableRecordBatchStream,
+    mode: AggregateMode,
+
+    /// Accumulators, one for each `AggregateExpr` in the query
+    accumulators: Vec<Box<dyn GroupsAccumulator>>,
+    /// Arguments expressionf or each accumulator
+    aggregate_arguments: Vec<Vec<Arc<dyn PhysicalExpr>>>,
+    /// Filter expression to evaluate for each aggregate
+    filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
+
+    /// Converter for each row
+    row_converter: RowConverter,
+    group_by: PhysicalGroupBy,
+
+    /// The memory reservation for this grouping
+    reservation: MemoryReservation,
+
+    /// Logically maps group values to a group_index `group_states`
+    ///
+    /// Uses the raw API of hashbrown to avoid actually storing the
+    /// keys in the table
+    ///
+    /// keys: u64 hashes of the GroupValue
+    /// values: (hash, index into `group_states`)
+    map: RawTable<(u64, usize)>,
+
+    /// The actual group by values, stored in arrow Row format
+    /// the index of group_by_values is the index
+    /// https://github.com/apache/arrow-rs/issues/4466
+    group_by_values: Vec<OwnedRow>,
+
+    /// scratch space for the current Batch / Aggregate being
+    /// processed. Saved here to avoid reallocations
+    current_group_indices: Vec<usize>,
+
+    /// generating input/output?
+    exec_state: ExecutionState,
+
+    baseline_metrics: BaselineMetrics,
+
+    random_state: RandomState,
+    /// size to be used for resulting RecordBatches
+    batch_size: usize,
+}
+
+impl GroupedHashAggregateStream2 {
+    /// Create a new GroupedHashAggregateStream
+    pub fn new(
+        agg: &AggregateExec,
+        context: Arc<TaskContext>,
+        partition: usize,
+    ) -> Result<Self> {
+        info!("Creating GroupedHashAggregateStream2");
+        let agg_schema = Arc::clone(&agg.schema);
+        let agg_group_by = agg.group_by.clone();
+        let agg_filter_expr = agg.filter_expr.clone();
+
+        let batch_size = context.session_config().batch_size();
+        let input = agg.input.execute(partition, Arc::clone(&context))?;
+        let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition);
+
+        let timer = baseline_metrics.elapsed_compute().timer();
+
+        let mut aggregate_exprs = vec![];
+        let mut aggregate_arguments = vec![];
+
+        // The expressions to evaluate the batch, one vec of expressions per aggregation.
+        // Assuming create_schema() always puts group columns in front of aggregation columns, we set
+        // col_idx_base to the group expression count.
+
+        let all_aggregate_expressions = aggregates::aggregate_expressions(
+            &agg.aggr_expr,
+            &agg.mode,
+            agg_group_by.expr.len(),
+        )?;
+        let filter_expressions = match agg.mode {
+            AggregateMode::Partial | AggregateMode::Single => agg_filter_expr,
+            AggregateMode::Final | AggregateMode::FinalPartitioned => {
+                vec![None; agg.aggr_expr.len()]
+            }
+        };
+
+        for (agg_expr, agg_args) in agg
+            .aggr_expr
+            .iter()
+            .zip(all_aggregate_expressions.into_iter())
+        {
+            aggregate_exprs.push(agg_expr.clone());
+            aggregate_arguments.push(agg_args);
+        }
+
+        let accumulators = create_accumulators(aggregate_exprs)?;
+
+        let group_schema = group_schema(&agg_schema, agg_group_by.expr.len());
+        let row_converter = RowConverter::new(
+            group_schema
+                .fields()
+                .iter()
+                .map(|f| SortField::new(f.data_type().clone()))
+                .collect(),
+        )?;
+
+        let name = format!("GroupedHashAggregateStream2[{partition}]");
+        let reservation = MemoryConsumer::new(name).register(context.memory_pool());
+        let map = RawTable::with_capacity(0);
+        let group_by_values = vec![];
+        let current_group_indices = vec![];
+
+        timer.done();
+
+        let exec_state = ExecutionState::ReadingInput;
+
+        Ok(GroupedHashAggregateStream2 {
+            schema: agg_schema,
+            input,
+            mode: agg.mode,
+            accumulators,
+            aggregate_arguments,
+            filter_expressions,
+            row_converter,
+            group_by: agg_group_by,
+            reservation,
+            map,
+            group_by_values,
+            current_group_indices,
+            exec_state,
+            baseline_metrics,
+            random_state: Default::default(),
+            batch_size,
+        })
+    }
+}
+
+/// Crate a `GroupsAccumulator` for each of the aggregate_exprs to hold the aggregation state
+fn create_accumulators(
+    aggregate_exprs: Vec<Arc<dyn AggregateExpr>>,
+) -> Result<Vec<Box<dyn GroupsAccumulator>>> {
+    info!("Creating accumulator for {aggregate_exprs:#?}");
+    aggregate_exprs
+        .into_iter()
+        .map(|agg_expr| agg_expr.create_groups_accumulator())
+        .collect()
+}
+
+impl Stream for GroupedHashAggregateStream2 {
+    type Item = Result<RecordBatch>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        let elapsed_compute = self.baseline_metrics.elapsed_compute().clone();
+
+        loop {
+            let exec_state = self.exec_state.clone();
+            match exec_state {
+                ExecutionState::ReadingInput => {
+                    match ready!(self.input.poll_next_unpin(cx)) {
+                        // new batch to aggregate
+                        Some(Ok(batch)) => {
+                            let timer = elapsed_compute.timer();
+                            let result = self.group_aggregate_batch(batch);
+                            timer.done();
+
+                            // allocate memory
+                            // This happens AFTER we actually used the memory, but simplifies the whole accounting and we are OK with
+                            // overshooting a bit. Also this means we either store the whole record batch or not.
+                            let result = result.and_then(|allocated| {
+                                self.reservation.try_grow(allocated)
+                            });
+
+                            if let Err(e) = result {
+                                return Poll::Ready(Some(Err(e)));
+                            }
+                        }
+                        // inner had error, return to caller
+                        Some(Err(e)) => return Poll::Ready(Some(Err(e))),
+                        // inner is done, producing output
+                        None => {
+                            let timer = elapsed_compute.timer();
+                            match self.create_batch_from_map() {
+                                Ok(batch) => {
+                                    self.exec_state =
+                                        ExecutionState::ProducingOutput(batch)
+                                }
+                                Err(e) => return Poll::Ready(Some(Err(e))),
+                            }
+                            timer.done();
+                        }
+                    }
+                }
+
+                ExecutionState::ProducingOutput(batch) => {
+                    // slice off a part of the batch, if needed
+                    let output_batch = if batch.num_rows() <= self.batch_size {
+                        self.exec_state = ExecutionState::Done;
+                        batch
+                    } else {
+                        // output first batch_size rows
+                        let num_remaining = batch.num_rows() - self.batch_size;
+                        let remaining = batch.slice(self.batch_size, num_remaining);
+                        self.exec_state = ExecutionState::ProducingOutput(remaining);
+                        batch.slice(0, self.batch_size)
+                    };
+                    return Poll::Ready(Some(Ok(
+                        output_batch.record_output(&self.baseline_metrics)
+                    )));
+                }
+
+                ExecutionState::Done => return Poll::Ready(None),
+            }
+        }
+    }
+}
+
+impl RecordBatchStream for GroupedHashAggregateStream2 {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
+
+impl GroupedHashAggregateStream2 {
+    /// Update self.aggr_state based on the group_by values (result of evalauting the group_by_expressions)
+    ///
+    /// At the return of this function,
+    /// `self.aggr_state.current_group_indices` has the correct
+    /// group_index for each row in the group_values
+    fn update_group_state(
+        &mut self,
+        group_values: &[ArrayRef],
+        allocated: &mut usize,
+    ) -> Result<()> {
+        // Convert the group keys into the row format
+        let group_rows = self.row_converter.convert_columns(group_values)?;
+        let n_rows = group_rows.num_rows();
+        // 1.1 construct the key from the group values
+        // 1.2 construct the mapping key if it does not exist
+
+        // tracks to which group each of the input rows belongs
+        let group_indices = &mut self.current_group_indices;
+        group_indices.clear();
+
+        // 1.1 Calculate the group keys for the group values
+        let mut batch_hashes = vec![0; n_rows];
+        create_hashes(group_values, &self.random_state, &mut batch_hashes)?;
+
+        for (row, hash) in batch_hashes.into_iter().enumerate() {
+            let entry = self.map.get_mut(hash, |(_hash, group_idx)| {
+                // verify that a group that we are inserting with hash is
+                // actually the same key value as the group in
+                // existing_idx  (aka group_values @ row)
+
+                // TODO update *allocated based on size of the row
+                // that was just pushed into
+                // aggr_state.group_by_values
+                group_rows.row(row) == self.group_by_values[*group_idx].row()
+            });
+
+            let group_idx = match entry {
+                // Existing group_index for this group value
+                Some((_hash, group_idx)) => *group_idx,
+                //  1.2 Need to create new entry for the group
+                None => {
+                    // Add new entry to aggr_state and save newly created index
+                    let group_idx = self.group_by_values.len();
+                    self.group_by_values.push(group_rows.row(row).owned());
+
+                    // for hasher function, use precomputed hash value
+                    self.map.insert_accounted(
+                        (hash, group_idx),
+                        |(hash, _group_index)| *hash,
+                        allocated,
+                    );
+                    group_idx
+                }
+            };
+            group_indices.push_accounted(group_idx, allocated);
+        }
+        Ok(())
+    }
+
+    /// Perform group-by aggregation for the given [`RecordBatch`].
+    ///
+    /// If successful, returns the additional amount of memory, in
+    /// bytes, that were allocated during this process.
+    ///
+    fn group_aggregate_batch(&mut self, batch: RecordBatch) -> Result<usize> {
+        // Evaluate the grouping expressions:
+        let group_by_values = evaluate_group_by(&self.group_by, &batch)?;
+
+        // Keep track of memory allocated:
+        let mut allocated = 0usize;
+
+        // Evaluate the aggregation expressions.
+        let input_values = evaluate_many(&self.aggregate_arguments, &batch)?;
+        // Evalaute the filter expressions, if any, against the inputs
+        let filter_values = evaluate_optional(&self.filter_expressions, &batch)?;
+
+        let row_converter_size_pre = self.row_converter.size();
+        for group_values in &group_by_values {
+            // calculate the group indicies for each input row
+            self.update_group_state(group_values, &mut allocated)?;
+            let group_indices = &self.current_group_indices;
+
+            // Gather the inputs to call the actual aggregation
+            let t = self
+                .accumulators
+                .iter_mut()
+                .zip(input_values.iter())
+                .zip(filter_values.iter());
+
+            let total_num_groups = self.group_by_values.len();
+
+            for ((acc, values), opt_filter) in t {
+                let acc_size_pre = acc.size();
+                let opt_filter = opt_filter.as_ref().map(|filter| filter.as_boolean());
+
+                match self.mode {
+                    AggregateMode::Partial | AggregateMode::Single => {
+                        acc.update_batch(
+                            values,
+                            &group_indices,
+                            opt_filter,
+                            total_num_groups,
+                        )?;
+                    }
+                    AggregateMode::FinalPartitioned | AggregateMode::Final => {
+                        // if aggregation is over intermediate states,
+                        // use merge
+                        acc.merge_batch(
+                            values,
+                            &group_indices,
+                            opt_filter,
+                            total_num_groups,
+                        )?;
+                    }
+                }
+
+                allocated += acc.size().saturating_sub(acc_size_pre);
+            }
+        }
+        allocated += self
+            .row_converter
+            .size()
+            .saturating_sub(row_converter_size_pre);
+
+        Ok(allocated)
+    }
+}
+
+impl GroupedHashAggregateStream2 {
+    /// Create an output RecordBatch with all group keys and accumulator states/values
+    fn create_batch_from_map(&mut self) -> Result<RecordBatch> {
+        if self.group_by_values.is_empty() {
+            let schema = self.schema.clone();
+            return Ok(RecordBatch::new_empty(schema));
+        }
+
+        // First output rows are the groups
+        let groups_rows = self.group_by_values.iter().map(|owned_row| owned_row.row());
+
+        let mut output: Vec<ArrayRef> = self.row_converter.convert_rows(groups_rows)?;
+
+        // Next output the accumulators
+        for acc in self.accumulators.iter_mut() {
+            match self.mode {
+                AggregateMode::Partial => output.extend(acc.state()?),
+                AggregateMode::Final
+                | AggregateMode::FinalPartitioned
+                | AggregateMode::Single => output.push(acc.evaluate()?),
+            }
+        }
+
+        Ok(RecordBatch::try_new(self.schema.clone(), output)?)
+    }
+}
diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml
index 04ba2b9e38..a8f82e60e4 100644
--- a/datafusion/physical-expr/Cargo.toml
+++ b/datafusion/physical-expr/Cargo.toml
@@ -59,6 +59,7 @@ indexmap = "2.0.0"
 itertools = { version = "0.11", features = ["use_std"] }
 lazy_static = { version = "^1.4.0" }
 libc = "0.2.140"
+log = "^0.4"
 md-5 = { version = "^0.10.0", optional = true }
 paste = "^1.0"
 petgraph = "0.6.2"
diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs
index 3c76da51a9..f81c704d8b 100644
--- a/datafusion/physical-expr/src/aggregate/average.rs
+++ b/datafusion/physical-expr/src/aggregate/average.rs
@@ -17,6 +17,9 @@
 
 //! Defines physical expressions that can evaluated at runtime during query execution
 
+use arrow::array::AsArray;
+use log::info;
+
 use std::any::Any;
 use std::convert::TryFrom;
 use std::sync::Arc;
@@ -29,14 +32,14 @@ use crate::aggregate::sum::sum_batch;
 use crate::aggregate::utils::calculate_result_decimal_for_avg;
 use crate::aggregate::utils::down_cast_any_ref;
 use crate::expressions::format_state_name;
-use crate::{AggregateExpr, PhysicalExpr};
+use crate::{AggregateExpr, GroupsAccumulator, PhysicalExpr};
 use arrow::compute;
-use arrow::datatypes::DataType;
+use arrow::datatypes::{DataType, Decimal128Type, UInt64Type};
 use arrow::{
     array::{ArrayRef, UInt64Array},
     datatypes::Field,
 };
-use arrow_array::Array;
+use arrow_array::{Array, ArrowNativeTypeOp, ArrowNumericType, PrimitiveArray};
 use datafusion_common::{downcast_value, ScalarValue};
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::Accumulator;
@@ -155,6 +158,22 @@ impl AggregateExpr for Avg {
             &self.rt_data_type,
         )?))
     }
+
+    fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
+        // instantiate specialized accumulator
+        match self.sum_data_type {
+            DataType::Decimal128(_, _) => {
+                Ok(Box::new(AvgGroupsAccumulator::<Decimal128Type>::new(
+                    &self.sum_data_type,
+                    &self.rt_data_type,
+                )))
+            }
+            _ => Err(DataFusionError::NotImplemented(format!(
+                "AvgGroupsAccumulator for {}",
+                self.sum_data_type
+            ))),
+        }
+    }
 }
 
 impl PartialEq<dyn Any> for Avg {
@@ -383,6 +402,199 @@ impl RowAccumulator for AvgRowAccumulator {
     }
 }
 
+/// An accumulator to compute the average of PrimitiveArray<T>.
+/// Stores values as native types
+#[derive(Debug)]
+struct AvgGroupsAccumulator<T: ArrowNumericType + Send> {
+    /// The type of the internal sum
+    sum_data_type: DataType,
+
+    /// The type of the returned sum
+    return_data_type: DataType,
+
+    /// Count per group (use u64 to make UInt64Array)
+    counts: Vec<u64>,
+
+    // Sums per group, stored as the native type
+    sums: Vec<T::Native>,
+}
+
+impl<T: ArrowNumericType + Send> AvgGroupsAccumulator<T> {
+    pub fn new(sum_data_type: &DataType, return_data_type: &DataType) -> Self {
+        info!(
+            "AvgGroupsAccumulator ({}, sum type: {sum_data_type:?}) --> {return_data_type:?}",
+            std::any::type_name::<T>()
+        );
+        Self {
+            return_data_type: return_data_type.clone(),
+            sum_data_type: sum_data_type.clone(),
+            counts: vec![],
+            sums: vec![],
+        }
+    }
+
+    /// Adds the values in `values` to self.sums
+    fn update_sums(
+        &mut self,
+        values: &PrimitiveArray<T>,
+        group_indicies: &[usize],
+        opt_filter: Option<&arrow_array::BooleanArray>,
+        total_num_groups: usize,
+    ) -> Result<()> {
+        self.sums
+            .resize_with(total_num_groups, || T::default_value());
+
+        // AAL TODO
+        // TODO combine the null mask from values and opt_filter
+        let valids = values.nulls();
+
+        // This is based on (ahem, COPY/PASTA) arrow::compute::aggregate::sum
+        let data: &[T::Native] = values.values();
+
+        match valids {
+            // use all values in group_index
+            None => {
+                let iter = group_indicies.iter().zip(data.iter());
+                for (group_index, new_value) in iter {
+                    self.sums[*group_index].add_wrapping(*new_value);
+                }
+            }
+            //
+            Some(valids) => {
+                let group_indices_chunks = group_indicies.chunks_exact(64);
+                let data_chunks = data.chunks_exact(64);
+                let bit_chunks = valids.inner().bit_chunks();
+
+                let group_indices_remainder = group_indices_chunks.remainder();
+                let data_remainder = data_chunks.remainder();
+
+                group_indices_chunks
+                    .zip(data_chunks)
+                    .zip(bit_chunks.iter())
+                    .for_each(|((group_index_chunk, data_chunk), mask)| {
+                        // index_mask has value 1 << i in the loop
+                        let mut index_mask = 1;
+                        group_index_chunk.iter().zip(data_chunk.iter()).for_each(
+                            |(group_index, new_value)| {
+                                if (mask & index_mask) != 0 {
+                                    self.sums[*group_index].add_wrapping(*new_value);
+                                }
+                                index_mask <<= 1;
+                            },
+                        )
+                    });
+
+                let remainder_bits = bit_chunks.remainder_bits();
+                group_indices_remainder
+                    .iter()
+                    .zip(data_remainder.iter())
+                    .enumerate()
+                    .for_each(|(i, (group_index, new_value))| {
+                        if remainder_bits & (1 << i) != 0 {
+                            self.sums[*group_index].add_wrapping(*new_value);
+                        }
+                    });
+            }
+        }
+        Ok(())
+    }
+}
+
+impl<T: ArrowNumericType + Send> GroupsAccumulator for AvgGroupsAccumulator<T> {
+    fn update_batch(
+        &mut self,
+        values: &[ArrayRef],
+        group_indicies: &[usize],
+        opt_filter: Option<&arrow_array::BooleanArray>,
+        total_num_groups: usize,
+    ) -> Result<()> {
+        assert_eq!(values.len(), 1, "single argument to update_batch");
+        let values = values.get(0).unwrap().as_primitive::<T>();
+
+        // update counts (TOD account for opt_filter)
+        self.counts.resize(total_num_groups, 0);
+        group_indicies.iter().for_each(|&group_idx| {
+            self.counts[group_idx] += 1;
+        });
+
+        // update values
+        self.update_sums(values, group_indicies, opt_filter, total_num_groups)?;
+        Ok(())
+    }
+
+    fn merge_batch(
+        &mut self,
+        values: &[ArrayRef],
+        group_indicies: &[usize],
+        opt_filter: Option<&arrow_array::BooleanArray>,
+        total_num_groups: usize,
+    ) -> Result<()> {
+        assert_eq!(values.len(), 2, "two arguments to merge_batch");
+        // first batch is counts, second is partial sums
+        let counts = values.get(0).unwrap().as_primitive::<UInt64Type>();
+        let partial_sums = values.get(1).unwrap().as_primitive::<T>();
+
+        // update counts by summing the partial sums (TODO account for opt_filter)
+        self.counts.resize(total_num_groups, 0);
+        group_indicies.iter().zip(counts.values().iter()).for_each(
+            |(&group_idx, &count)| {
+                self.counts[group_idx] += count;
+            },
+        );
+
+        // update values
+        self.update_sums(partial_sums, group_indicies, opt_filter, total_num_groups)?;
+
+        Ok(())
+    }
+
+    fn evaluate(&mut self) -> Result<ArrayRef> {
+        todo!()
+    }
+
+    // return arrays for sums and counts
+    fn state(&mut self) -> Result<Vec<ArrayRef>> {
+        let counts = std::mem::take(&mut self.counts);
+        // create array from vec is zero copy
+        let counts = UInt64Array::from(counts);
+
+        let sums = std::mem::take(&mut self.sums);
+        // create array from vec is zero copy
+        // TODO figure out how to do this without the iter / copy
+        let sums: PrimitiveArray<T> = PrimitiveArray::from_iter_values(sums);
+
+        // fix up decimal precision and scale
+        let sums = set_decimal_precision(&self.sum_data_type, Arc::new(sums))?;
+
+        Ok(vec![
+            Arc::new(counts) as ArrayRef,
+            Arc::new(sums) as ArrayRef,
+        ])
+    }
+
+    fn size(&self) -> usize {
+        self.counts.capacity() * std::mem::size_of::<usize>()
+    }
+}
+
+/// Adjust array type metadata if needed
+///
+/// Decimal128Arrays are are are created from Vec<NativeType> with default
+/// precision and scale. This function adjusts them down.
+fn set_decimal_precision(sum_data_type: &DataType, array: ArrayRef) -> Result<ArrayRef> {
+    let array = match sum_data_type {
+        DataType::Decimal128(p, s) => Arc::new(
+            array
+                .as_primitive::<Decimal128Type>()
+                .clone()
+                .with_precision_and_scale(*p, *s)?,
+        ),
+        // no adjustment needed for other arrays
+        _ => array,
+    };
+    Ok(array)
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator.rs
new file mode 100644
index 0000000000..82cfbfaa31
--- /dev/null
+++ b/datafusion/physical-expr/src/aggregate/groups_accumulator.rs
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Vectorized [`GroupsAccumulator`]
+
+use arrow_array::{ArrayRef, BooleanArray};
+use datafusion_common::Result;
+
+/// An implementation of GroupAccumulator is for a single aggregate
+/// (e.g. AVG) and stores the state for *all* groups internally
+///
+/// The logical model is that each group is given a `group_index`
+/// assigned and maintained by the hash table.
+///
+/// group_indexes are contiguous (there aren't gaps), and thus it is
+/// expected that each GroupAccumulator will use something like `Vec<..>`
+/// to store the group states.
+pub trait GroupsAccumulator: Send {
+    /// updates the accumulator's state from a vector of arrays:
+    ///
+    /// * `values`: the input arguments to the accumulator
+    /// * `group_indices`:  To which groups do the rows in `values` belong, group id)
+    /// * `opt_filter`: if present, only update aggregate state using values[i] if opt_filter[i] is true
+    /// * `total_num_groups`: the number of groups (the largest group_index is total_num_groups - 1)
+    fn update_batch(
+        &mut self,
+        values: &[ArrayRef],
+        group_indicies: &[usize],
+        opt_filter: Option<&BooleanArray>,
+        total_num_groups: usize,
+    ) -> Result<()>;
+
+    /// Returns the final aggregate value for each group as a single
+    /// `RecordBatch`
+    ///
+    /// OPEN QUESTION: Should this method take a "batch_size: usize"
+    /// and produce a Vec<RecordBatch> as output to avoid 1) requiring
+    /// one giant intermediate buffer?
+    ///
+    /// For example, the `SUM` accumulator maintains a running sum,
+    /// and `evaluate` will produce that running sum as its output for
+    /// all groups, in group_index order
+    ///
+    /// This call should be treated as consuming (takes `self`, but it
+    /// can not be due to keeping it object save) the accumulator is
+    /// free to release / reset it is internal state after this call
+    /// and error on any subsequent call.
+    fn evaluate(&mut self) -> Result<ArrayRef>;
+
+    /// Returns any intermediate aggregate state used for multi-phase grouping
+    ///
+    /// For example, AVG returns two arrays:  `SUM` and `COUNT`.
+    ///
+    /// This call should be treated as consuming (takes `self`, but it
+    /// can not be due to keeping it object save) the accumulator is
+    /// free to release / reset it is internal state after this call
+    /// and error on any subsequent call.
+    ///
+    /// TODO: consider returning a single Array (which could be a
+    /// StructArray) instead
+    fn state(&mut self) -> Result<Vec<ArrayRef>>;
+
+    /// merges intermediate state (from `state()`) into this accumulators values
+    ///
+    /// For some aggregates (such as `SUM`), merge_batch is the same
+    /// as `update_batch`, but for some aggregrates (such as `COUNT`)
+    /// the operations differ. See [`Self::state`] for more details on how
+    /// state is used and merged.
+    ///
+    /// * `values`: arrays produced from calling `state` previously to the accumulator
+    /// * `group_indices`:  To which groups do the rows in `values` belong, group id)
+    /// * `opt_filter`: if present, only update aggregate state using values[i] if opt_filter[i] is true
+    /// * `total_num_groups`: the number of groups (the largest group_index is total_num_groups - 1)
+    fn merge_batch(
+        &mut self,
+        values: &[ArrayRef],
+        group_indicies: &[usize],
+        opt_filter: Option<&BooleanArray>,
+        total_num_groups: usize,
+    ) -> Result<()>;
+
+    /// Amount of memory used to store the state of this
+    /// accumulator. This function is called once per batch, so it
+    /// should be O(n) to compute
+    fn size(&self) -> usize;
+}
diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs
index 9be6d5e1ba..4b613c8e9b 100644
--- a/datafusion/physical-expr/src/aggregate/mod.rs
+++ b/datafusion/physical-expr/src/aggregate/mod.rs
@@ -25,6 +25,8 @@ use std::any::Any;
 use std::fmt::Debug;
 use std::sync::Arc;
 
+use self::groups_accumulator::GroupsAccumulator;
+
 pub(crate) mod approx_distinct;
 pub(crate) mod approx_median;
 pub(crate) mod approx_percentile_cont;
@@ -45,6 +47,7 @@ pub(crate) mod median;
 #[macro_use]
 pub(crate) mod min_max;
 pub mod build_in;
+pub(crate) mod groups_accumulator;
 mod hyperloglog;
 pub mod moving_min_max;
 pub mod row_accumulator;
@@ -118,6 +121,18 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> {
         )))
     }
 
+    /// Return a specialized [`GroupsAccumulator`] that manages state for all groups
+    ///
+    /// For maximum performance, [`GroupsAccumulator`] should be
+    /// implemented rather than [`Accumulator`].
+    fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
+        // TODO: The default should implement a wrapper over
+        // sef.create_accumulator
+        Err(DataFusionError::NotImplemented(format!(
+            "GroupsAccumulator hasn't been implemented for {self:?} yet"
+        )))
+    }
+
     /// Construct an expression that calculates the aggregate in reverse.
     /// Typically the "reverse" expression is itself (e.g. SUM, COUNT).
     /// For aggregates that do not support calculation in reverse,
diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs
index 0a2e0e58df..6ea8dc9487 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -45,7 +45,9 @@ pub mod var_provider;
 pub mod window;
 
 // reexport this to maintain compatibility with anything that used from_slice previously
+pub use aggregate::groups_accumulator::GroupsAccumulator;
 pub use aggregate::AggregateExpr;
+
 pub use equivalence::{
     project_equivalence_properties, project_ordering_equivalence_properties,
     EquivalenceProperties, EquivalentClass, OrderingEquivalenceProperties,


[arrow-datafusion] 06/17: Update comments and simplify code

Posted by dh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch hash_agg_spike
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 82c9205200c0049aae7bfa07815cfcad2e7ffc3c
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Sat Jul 1 05:07:56 2023 -0400

    Update comments and simplify code
---
 .../core/src/physical_plan/aggregates/row_hash2.rs | 54 +++++++++-------------
 1 file changed, 21 insertions(+), 33 deletions(-)

diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash2.rs b/datafusion/core/src/physical_plan/aggregates/row_hash2.rs
index 3e9dbfe0cf..792fbb4032 100644
--- a/datafusion/core/src/physical_plan/aggregates/row_hash2.rs
+++ b/datafusion/core/src/physical_plan/aggregates/row_hash2.rs
@@ -36,7 +36,7 @@ use crate::physical_plan::aggregates::{
     PhysicalGroupBy,
 };
 use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
-use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
+use crate::physical_plan::{aggregates, PhysicalExpr};
 use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
 use arrow::array::*;
 use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
@@ -204,12 +204,11 @@ impl GroupedHashAggregateStream2 {
 
         let timer = baseline_metrics.elapsed_compute().timer();
 
-        let mut aggregate_exprs = vec![];
-        let mut aggregate_arguments = vec![];
+        let aggregate_exprs = agg.aggr_expr.clone();
 
-        // The arguments for each aggregate, one vec of expressions
-        // per aggregation.
-        let all_aggregate_expressions = aggregates::aggregate_expressions(
+        // arguments for each aggregate, one vec of expressions per
+        // aggregate
+        let aggregate_arguments = aggregates::aggregate_expressions(
             &agg.aggr_expr,
             &agg.mode,
             agg_group_by.expr.len(),
@@ -222,16 +221,11 @@ impl GroupedHashAggregateStream2 {
             }
         };
 
-        for (agg_expr, agg_args) in agg
-            .aggr_expr
+        // Instantiate the accumulators
+        let accumulators: Vec<_> = aggregate_exprs
             .iter()
-            .zip(all_aggregate_expressions.into_iter())
-        {
-            aggregate_exprs.push(agg_expr.clone());
-            aggregate_arguments.push(agg_args);
-        }
-
-        let accumulators = create_accumulators(aggregate_exprs)?;
+            .map(|agg_expr| agg_expr.create_groups_accumulator())
+            .collect::<Result<_>>()?;
 
         let group_schema = group_schema(&agg_schema, agg_group_by.expr.len());
         let row_converter = RowConverter::new(
@@ -273,18 +267,6 @@ impl GroupedHashAggregateStream2 {
     }
 }
 
-/// Crate a [`GroupsAccumulator`] for each of the aggregate_exprs to
-/// hold the aggregation state
-fn create_accumulators(
-    aggregate_exprs: Vec<Arc<dyn AggregateExpr>>,
-) -> Result<Vec<Box<dyn GroupsAccumulator>>> {
-    debug!("Creating accumulator for {aggregate_exprs:#?}");
-    aggregate_exprs
-        .into_iter()
-        .map(|agg_expr| agg_expr.create_groups_accumulator())
-        .collect()
-}
-
 impl Stream for GroupedHashAggregateStream2 {
     type Item = Result<RecordBatch>;
 
@@ -363,11 +345,13 @@ impl RecordBatchStream for GroupedHashAggregateStream2 {
 }
 
 impl GroupedHashAggregateStream2 {
-    /// Update self.aggr_state based on the group_by values (result of evalauting the group_by_expressions)
+    /// Calculates the group indicies for each input row of
+    /// `group_values`.
     ///
     /// At the return of this function,
-    /// `self.aggr_state.current_group_indices` has the correct
-    /// group_index for each row in the group_values
+    /// [`Self::current_group_indicies`] has the same number of
+    /// entries as each array in `group_values` and holds the correct
+    /// group_index for that row.
     fn update_group_state(
         &mut self,
         group_values: &[ArrayRef],
@@ -376,6 +360,7 @@ impl GroupedHashAggregateStream2 {
         // Convert the group keys into the row format
         let group_rows = self.row_converter.convert_columns(group_values)?;
         let n_rows = group_rows.num_rows();
+
         // 1.1 construct the key from the group values
         // 1.2 construct the mapping key if it does not exist
 
@@ -426,9 +411,8 @@ impl GroupedHashAggregateStream2 {
     ///
     /// If successful, returns the additional amount of memory, in
     /// bytes, that were allocated during this process.
-    ///
     fn group_aggregate_batch(&mut self, batch: RecordBatch) -> Result<usize> {
-        // Evaluate the grouping expressions:
+        // Evaluate the grouping expressions
         let group_by_values = evaluate_group_by(&self.group_by, &batch)?;
 
         // Keep track of memory allocated:
@@ -436,10 +420,12 @@ impl GroupedHashAggregateStream2 {
 
         // Evaluate the aggregation expressions.
         let input_values = evaluate_many(&self.aggregate_arguments, &batch)?;
-        // Evalaute the filter expressions, if any, against the inputs
+
+        // Evalute the filter expressions, if any, against the inputs
         let filter_values = evaluate_optional(&self.filter_expressions, &batch)?;
 
         let row_converter_size_pre = self.row_converter.size();
+
         for group_values in &group_by_values {
             // calculate the group indicies for each input row
             self.update_group_state(group_values, &mut allocated)?;
@@ -458,6 +444,8 @@ impl GroupedHashAggregateStream2 {
                 let acc_size_pre = acc.size();
                 let opt_filter = opt_filter.as_ref().map(|filter| filter.as_boolean());
 
+                // Call the appropriate method on each aggregator with
+                // the entire input row and the relevant group indexes
                 match self.mode {
                     AggregateMode::Partial | AggregateMode::Single => {
                         acc.update_batch(


[arrow-datafusion] 10/17: Move accumulator to their own function

Posted by dh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch hash_agg_spike
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 79b1bc9041798d7a0ec97e6e4c32f75bdc9b6eca
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Sat Jul 1 06:27:34 2023 -0400

    Move accumulator to their own function
---
 datafusion/physical-expr/src/aggregate/average.rs  | 102 +----------------
 .../src/aggregate/groups_accumulator/accumulate.rs | 121 +++++++++++++++++++++
 .../mod.rs}                                        |   2 +
 3 files changed, 124 insertions(+), 101 deletions(-)

diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs
index 2d9a627a5f..3f3c7820be 100644
--- a/datafusion/physical-expr/src/aggregate/average.rs
+++ b/datafusion/physical-expr/src/aggregate/average.rs
@@ -45,6 +45,7 @@ use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::Accumulator;
 use datafusion_row::accessor::RowAccessor;
 
+use super::groups_accumulator::accumulate::{accumulate_all, accumulate_all_nullable};
 use super::utils::Decimal128Averager;
 
 /// AVG aggregate expression
@@ -417,107 +418,6 @@ impl RowAccumulator for AvgRowAccumulator {
     }
 }
 
-/// This function is called to update the accumulator state per row,
-/// for a `PrimitiveArray<T>` with no nulls. It is the inner loop for
-/// many GroupsAccumulators and thus performance critical.
-///
-/// I couldn't find any way to combine this with
-/// accumulate_all_nullable without having to pass in a is_null on
-/// every row.
-///
-/// * `values`: the input arguments to the accumulator
-/// * `group_indices`:  To which groups do the rows in `values` belong, group id)
-/// * `opt_filter`: if present, only update aggregate state using values[i] if opt_filter[i] is true
-///
-/// `F`: The function to invoke for a non null input row to update the
-/// accumulator state. Called like `value_fn(group_index, value)
-fn accumulate_all<T, F>(
-    values: &PrimitiveArray<T>,
-    group_indicies: &[usize],
-    opt_filter: Option<&arrow_array::BooleanArray>,
-    mut value_fn: F,
-) where
-    T: ArrowNumericType + Send,
-    F: FnMut(usize, T::Native) + Send,
-{
-    assert_eq!(
-        values.null_count(), 0,
-        "Called accumulate_all with nullable array (call accumulate_all_nullable instead)"
-    );
-
-    // AAL TODO handle filter values
-
-    let data: &[T::Native] = values.values();
-    let iter = group_indicies.iter().zip(data.iter());
-    for (&group_index, &new_value) in iter {
-        value_fn(group_index, new_value)
-    }
-}
-
-/// This function is called to update the accumulator state per row,
-/// for a `PrimitiveArray<T>` with no nulls. It is the inner loop for
-/// many GroupsAccumulators and thus performance critical.
-///
-/// * `values`: the input arguments to the accumulator
-/// * `group_indices`:  To which groups do the rows in `values` belong, group id)
-/// * `opt_filter`: if present, only update aggregate state using values[i] if opt_filter[i] is true
-///
-/// `F`: The function to invoke for an input row to update the
-/// accumulator state. Called like `value_fn(group_index, value,
-/// is_valid). NOTE the parameter is true when the value is VALID.
-fn accumulate_all_nullable<T, F>(
-    values: &PrimitiveArray<T>,
-    group_indicies: &[usize],
-    opt_filter: Option<&arrow_array::BooleanArray>,
-    mut value_fn: F,
-) where
-    T: ArrowNumericType + Send,
-    F: FnMut(usize, T::Native, bool) + Send,
-{
-    // AAL TODO handle filter values
-    // TODO combine the null mask from values and opt_filter
-    let valids = values
-        .nulls()
-        .expect("Called accumulate_all_nullable with non-nullable array (call accumulate_all instead)");
-
-    // This is based on (ahem, COPY/PASTA) arrow::compute::aggregate::sum
-    let data: &[T::Native] = values.values();
-
-    let group_indices_chunks = group_indicies.chunks_exact(64);
-    let data_chunks = data.chunks_exact(64);
-    let bit_chunks = valids.inner().bit_chunks();
-
-    let group_indices_remainder = group_indices_chunks.remainder();
-    let data_remainder = data_chunks.remainder();
-
-    group_indices_chunks
-        .zip(data_chunks)
-        .zip(bit_chunks.iter())
-        .for_each(|((group_index_chunk, data_chunk), mask)| {
-            // index_mask has value 1 << i in the loop
-            let mut index_mask = 1;
-            group_index_chunk.iter().zip(data_chunk.iter()).for_each(
-                |(&group_index, &new_value)| {
-                    // valid bit was set, real vale
-                    let is_valid = (mask & index_mask) != 0;
-                    value_fn(group_index, new_value, is_valid);
-                    index_mask <<= 1;
-                },
-            )
-        });
-
-    // handle any remaining bits (after the intial 64)
-    let remainder_bits = bit_chunks.remainder_bits();
-    group_indices_remainder
-        .iter()
-        .zip(data_remainder.iter())
-        .enumerate()
-        .for_each(|(i, (&group_index, &new_value))| {
-            let is_valid = remainder_bits & (1 << i) != 0;
-            value_fn(group_index, new_value, is_valid)
-        });
-}
-
 /// An accumulator to compute the average of PrimitiveArray<T>.
 /// Stores values as native types, and does overflow checking
 ///
diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs
new file mode 100644
index 0000000000..5d72328763
--- /dev/null
+++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Vectorized [`accumulate`] and [`accumulate_nullable`] functions
+
+use arrow_array::{Array, ArrowNumericType, PrimitiveArray};
+
+/// This function is called to update the accumulator state per row,
+/// for a `PrimitiveArray<T>` with no nulls. It is the inner loop for
+/// many GroupsAccumulators and thus performance critical.
+///
+/// I couldn't find any way to combine this with
+/// accumulate_all_nullable without having to pass in a is_null on
+/// every row.
+///
+/// * `values`: the input arguments to the accumulator
+/// * `group_indices`:  To which groups do the rows in `values` belong, group id)
+/// * `opt_filter`: if present, only update aggregate state using values[i] if opt_filter[i] is true
+///
+/// `F`: The function to invoke for a non null input row to update the
+/// accumulator state. Called like `value_fn(group_index, value)
+pub fn accumulate_all<T, F>(
+    values: &PrimitiveArray<T>,
+    group_indicies: &[usize],
+    opt_filter: Option<&arrow_array::BooleanArray>,
+    mut value_fn: F,
+) where
+    T: ArrowNumericType + Send,
+    F: FnMut(usize, T::Native) + Send,
+{
+    assert_eq!(
+        values.null_count(), 0,
+        "Called accumulate_all with nullable array (call accumulate_all_nullable instead)"
+    );
+
+    // AAL TODO handle filter values
+
+    let data: &[T::Native] = values.values();
+    let iter = group_indicies.iter().zip(data.iter());
+    for (&group_index, &new_value) in iter {
+        value_fn(group_index, new_value)
+    }
+}
+
+/// This function is called to update the accumulator state per row,
+/// for a `PrimitiveArray<T>` with no nulls. It is the inner loop for
+/// many GroupsAccumulators and thus performance critical.
+///
+/// * `values`: the input arguments to the accumulator
+/// * `group_indices`:  To which groups do the rows in `values` belong, group id)
+/// * `opt_filter`: if present, only update aggregate state using values[i] if opt_filter[i] is true
+///
+/// `F`: The function to invoke for an input row to update the
+/// accumulator state. Called like `value_fn(group_index, value,
+/// is_valid). NOTE the parameter is true when the value is VALID.
+pub fn accumulate_all_nullable<T, F>(
+    values: &PrimitiveArray<T>,
+    group_indicies: &[usize],
+    opt_filter: Option<&arrow_array::BooleanArray>,
+    mut value_fn: F,
+) where
+    T: ArrowNumericType + Send,
+    F: FnMut(usize, T::Native, bool) + Send,
+{
+    // AAL TODO handle filter values
+    // TODO combine the null mask from values and opt_filter
+    let valids = values
+        .nulls()
+        .expect("Called accumulate_all_nullable with non-nullable array (call accumulate_all instead)");
+
+    // This is based on (ahem, COPY/PASTA) arrow::compute::aggregate::sum
+    let data: &[T::Native] = values.values();
+
+    let group_indices_chunks = group_indicies.chunks_exact(64);
+    let data_chunks = data.chunks_exact(64);
+    let bit_chunks = valids.inner().bit_chunks();
+
+    let group_indices_remainder = group_indices_chunks.remainder();
+    let data_remainder = data_chunks.remainder();
+
+    group_indices_chunks
+        .zip(data_chunks)
+        .zip(bit_chunks.iter())
+        .for_each(|((group_index_chunk, data_chunk), mask)| {
+            // index_mask has value 1 << i in the loop
+            let mut index_mask = 1;
+            group_index_chunk.iter().zip(data_chunk.iter()).for_each(
+                |(&group_index, &new_value)| {
+                    // valid bit was set, real vale
+                    let is_valid = (mask & index_mask) != 0;
+                    value_fn(group_index, new_value, is_valid);
+                    index_mask <<= 1;
+                },
+            )
+        });
+
+    // handle any remaining bits (after the intial 64)
+    let remainder_bits = bit_chunks.remainder_bits();
+    group_indices_remainder
+        .iter()
+        .zip(data_remainder.iter())
+        .enumerate()
+        .for_each(|(i, (&group_index, &new_value))| {
+            let is_valid = remainder_bits & (1 << i) != 0;
+            value_fn(group_index, new_value, is_valid)
+        });
+}
diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs
similarity index 99%
rename from datafusion/physical-expr/src/aggregate/groups_accumulator.rs
rename to datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs
index 82cfbfaa31..680eb927a1 100644
--- a/datafusion/physical-expr/src/aggregate/groups_accumulator.rs
+++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs
@@ -17,6 +17,8 @@
 
 //! Vectorized [`GroupsAccumulator`]
 
+pub mod accumulate;
+
 use arrow_array::{ArrayRef, BooleanArray};
 use datafusion_common::Result;
 


[arrow-datafusion] 11/17: update more comments

Posted by dh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch hash_agg_spike
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit c3496ccd6d719c420d7e4dd546e43ca8b1232f5a
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Sat Jul 1 06:45:04 2023 -0400

    update more comments
---
 datafusion/physical-expr/src/aggregate/average.rs  | 26 ++++----
 .../src/aggregate/groups_accumulator/accumulate.rs | 70 ++++++++++++++++------
 2 files changed, 66 insertions(+), 30 deletions(-)

diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs
index 3f3c7820be..ee249f3bd1 100644
--- a/datafusion/physical-expr/src/aggregate/average.rs
+++ b/datafusion/physical-expr/src/aggregate/average.rs
@@ -467,8 +467,8 @@ where
     /// Adds one to each group's counter
     fn increment_counts(
         &mut self,
-        values: &PrimitiveArray<T>,
         group_indicies: &[usize],
+        values: &PrimitiveArray<T>,
         opt_filter: Option<&arrow_array::BooleanArray>,
         total_num_groups: usize,
     ) {
@@ -476,8 +476,8 @@ where
 
         if values.null_count() == 0 {
             accumulate_all(
-                values,
                 group_indicies,
+                values,
                 opt_filter,
                 |group_index, _new_value| {
                     self.counts[group_index] += 1;
@@ -485,8 +485,8 @@ where
             )
         } else {
             accumulate_all_nullable(
-                values,
                 group_indicies,
+                values,
                 opt_filter,
                 |group_index, _new_value, is_valid| {
                     if is_valid {
@@ -500,8 +500,8 @@ where
     /// Adds the counts with the partial counts
     fn update_counts_with_partial_counts(
         &mut self,
-        partial_counts: &UInt64Array,
         group_indicies: &[usize],
+        partial_counts: &UInt64Array,
         opt_filter: Option<&arrow_array::BooleanArray>,
         total_num_groups: usize,
     ) {
@@ -509,8 +509,8 @@ where
 
         if partial_counts.null_count() == 0 {
             accumulate_all(
-                partial_counts,
                 group_indicies,
+                partial_counts,
                 opt_filter,
                 |group_index, partial_count| {
                     self.counts[group_index] += partial_count;
@@ -518,8 +518,8 @@ where
             )
         } else {
             accumulate_all_nullable(
-                partial_counts,
                 group_indicies,
+                partial_counts,
                 opt_filter,
                 |group_index, partial_count, is_valid| {
                     if is_valid {
@@ -533,8 +533,8 @@ where
     /// Adds the values in `values` to self.sums
     fn update_sums(
         &mut self,
-        values: &PrimitiveArray<T>,
         group_indicies: &[usize],
+        values: &PrimitiveArray<T>,
         opt_filter: Option<&arrow_array::BooleanArray>,
         total_num_groups: usize,
     ) {
@@ -543,8 +543,8 @@ where
 
         if values.null_count() == 0 {
             accumulate_all(
-                values,
                 group_indicies,
+                values,
                 opt_filter,
                 |group_index, new_value| {
                     let sum = &mut self.sums[group_index];
@@ -553,8 +553,8 @@ where
             )
         } else {
             accumulate_all_nullable(
-                values,
                 group_indicies,
+                values,
                 opt_filter,
                 |group_index, new_value, is_valid| {
                     if is_valid {
@@ -582,8 +582,8 @@ where
         assert_eq!(values.len(), 1, "single argument to update_batch");
         let values = values.get(0).unwrap().as_primitive::<T>();
 
-        self.increment_counts(values, group_indicies, opt_filter, total_num_groups);
-        self.update_sums(values, group_indicies, opt_filter, total_num_groups);
+        self.increment_counts(group_indicies, values, opt_filter, total_num_groups);
+        self.update_sums(group_indicies, values, opt_filter, total_num_groups);
 
         Ok(())
     }
@@ -600,12 +600,12 @@ where
         let partial_counts = values.get(0).unwrap().as_primitive::<UInt64Type>();
         let partial_sums = values.get(1).unwrap().as_primitive::<T>();
         self.update_counts_with_partial_counts(
-            partial_counts,
             group_indicies,
+            partial_counts,
             opt_filter,
             total_num_groups,
         );
-        self.update_sums(partial_sums, group_indicies, opt_filter, total_num_groups);
+        self.update_sums(group_indicies, partial_sums, opt_filter, total_num_groups);
 
         Ok(())
     }
diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs
index 5d72328763..f8a6791def 100644
--- a/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs
+++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs
@@ -19,23 +19,55 @@
 
 use arrow_array::{Array, ArrowNumericType, PrimitiveArray};
 
-/// This function is called to update the accumulator state per row,
+/// This function is used to update the accumulator state per row,
 /// for a `PrimitiveArray<T>` with no nulls. It is the inner loop for
 /// many GroupsAccumulators and thus performance critical.
 ///
-/// I couldn't find any way to combine this with
-/// accumulate_all_nullable without having to pass in a is_null on
-/// every row.
+/// # Arguments:
 ///
 /// * `values`: the input arguments to the accumulator
 /// * `group_indices`:  To which groups do the rows in `values` belong, group id)
-/// * `opt_filter`: if present, only update aggregate state using values[i] if opt_filter[i] is true
+/// * `opt_filter`: if present, invoke value_fn if opt_filter[i] is true
+/// * `value_fn`: function invoked for each (group_index, value) pair.
+///
+/// `F`: Invoked for each input row like `value_fn(group_index, value)
+///
+/// # Example
+///
+/// ```
+///  ┌─────────┐   ┌─────────┐   ┌ ─ ─ ─ ─ ┐
+///  │ ┌─────┐ │   │ ┌─────┐ │     ┌─────┐
+///  │ │  2  │ │   │ │ 200 │ │   │ │  t  │ │
+///  │ ├─────┤ │   │ ├─────┤ │     ├─────┤
+///  │ │  2  │ │   │ │ 100 │ │   │ │  f  │ │
+///  │ ├─────┤ │   │ ├─────┤ │     ├─────┤
+///  │ │  0  │ │   │ │ 200 │ │   │ │  t  │ │
+///  │ ├─────┤ │   │ ├─────┤ │     ├─────┤
+///  │ │  1  │ │   │ │ 200 │ │   │ │NULL │ │
+///  │ ├─────┤ │   │ ├─────┤ │     ├─────┤
+///  │ │  0  │ │   │ │ 300 │ │   │ │  t  │ │
+///  │ └─────┘ │   │ └─────┘ │     └─────┘
+///  └─────────┘   └─────────┘   └ ─ ─ ─ ─ ┘
+///
+/// group_indices   values        opt_filter
+/// ```
+///
+/// In the example above, `value_fn` is invoked for each (group_index,
+/// value) pair where `opt_filter[i]` is true
+///
+/// ```text
+/// value_fn(2, 200)
+/// value_fn(0, 200)
+/// value_fn(0, 300)
+/// ```
+///
+/// I couldn't find any way to combine this with
+/// accumulate_all_nullable without having to pass in a is_null on
+/// every row.
 ///
-/// `F`: The function to invoke for a non null input row to update the
-/// accumulator state. Called like `value_fn(group_index, value)
 pub fn accumulate_all<T, F>(
-    values: &PrimitiveArray<T>,
     group_indicies: &[usize],
+    values: &PrimitiveArray<T>,
     opt_filter: Option<&arrow_array::BooleanArray>,
     mut value_fn: F,
 ) where
@@ -57,19 +89,16 @@ pub fn accumulate_all<T, F>(
 }
 
 /// This function is called to update the accumulator state per row,
-/// for a `PrimitiveArray<T>` with no nulls. It is the inner loop for
-/// many GroupsAccumulators and thus performance critical.
+/// for a `PrimitiveArray<T>` that can have nulls. See
+/// [`accumulate_all`] for more detail and example
 ///
-/// * `values`: the input arguments to the accumulator
-/// * `group_indices`:  To which groups do the rows in `values` belong, group id)
-/// * `opt_filter`: if present, only update aggregate state using values[i] if opt_filter[i] is true
+/// `F`: Invoked like `value_fn(group_index, value, is_valid).
 ///
-/// `F`: The function to invoke for an input row to update the
-/// accumulator state. Called like `value_fn(group_index, value,
-/// is_valid). NOTE the parameter is true when the value is VALID.
+/// NOTE the parameter is true when the value is VALID (not when it is
+/// NULL).
 pub fn accumulate_all_nullable<T, F>(
-    values: &PrimitiveArray<T>,
     group_indicies: &[usize],
+    values: &PrimitiveArray<T>,
     opt_filter: Option<&arrow_array::BooleanArray>,
     mut value_fn: F,
 ) where
@@ -119,3 +148,10 @@ pub fn accumulate_all_nullable<T, F>(
             value_fn(group_index, new_value, is_valid)
         });
 }
+
+#[cfg(test)]
+mod test {
+
+    #[test]
+    fn basic() {}
+}


[arrow-datafusion] 01/17: Refactor Decimal128 averaging code to be vectorizable (and easier to read)

Posted by dh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch hash_agg_spike
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 90b38b0d467eb5f1aaa0dde138035504a0ff64a9
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Fri Jun 30 08:59:34 2023 -0400

    Refactor Decimal128 averaging code to be vectorizable (and easier to read)
---
 datafusion/physical-expr/src/aggregate/utils.rs | 130 +++++++++++++++++-------
 1 file changed, 96 insertions(+), 34 deletions(-)

diff --git a/datafusion/physical-expr/src/aggregate/utils.rs b/datafusion/physical-expr/src/aggregate/utils.rs
index f6f0086919..5dfd29ec98 100644
--- a/datafusion/physical-expr/src/aggregate/utils.rs
+++ b/datafusion/physical-expr/src/aggregate/utils.rs
@@ -37,45 +37,107 @@ pub fn get_accum_scalar_values_as_arrays(
         .collect::<Vec<_>>())
 }
 
-pub fn calculate_result_decimal_for_avg(
-    lit_value: i128,
-    count: i128,
-    scale: i8,
-    target_type: &DataType,
-) -> Result<ScalarValue> {
-    match target_type {
-        DataType::Decimal128(p, s) => {
-            // Different precision for decimal128 can store different range of value.
-            // For example, the precision is 3, the max of value is `999` and the min
-            // value is `-999`
-            let (target_mul, target_min, target_max) = (
-                10_i128.pow(*s as u32),
-                MIN_DECIMAL_FOR_EACH_PRECISION[*p as usize - 1],
-                MAX_DECIMAL_FOR_EACH_PRECISION[*p as usize - 1],
-            );
-            let lit_scale_mul = 10_i128.pow(scale as u32);
-            if target_mul >= lit_scale_mul {
-                if let Some(value) = lit_value.checked_mul(target_mul / lit_scale_mul) {
-                    let new_value = value / count;
-                    if new_value >= target_min && new_value <= target_max {
-                        Ok(ScalarValue::Decimal128(Some(new_value), *p, *s))
-                    } else {
-                        Err(DataFusionError::Execution(
-                            "Arithmetic Overflow in AvgAccumulator".to_string(),
-                        ))
-                    }
-                } else {
-                    // can't convert the lit decimal to the returned data type
-                    Err(DataFusionError::Execution(
-                        "Arithmetic Overflow in AvgAccumulator".to_string(),
-                    ))
-                }
+/// Computes averages for `Decimal128` values, checking for overflow
+///
+/// This is needed because different precisions for Decimal128 can
+/// store different ranges of values and thus sum/count may not fit in
+/// the target type.
+///
+/// For example, the precision is 3, the max of value is `999` and the min
+/// value is `-999`
+pub(crate) struct Decimal128Averager {
+    /// scale factor for sum values (10^sum_scale)
+    sum_mul: i128,
+    /// scale factor for target (10^target_scale)
+    target_mul: i128,
+    /// The minimum output value possible to represent with the target precision
+    target_min: i128,
+    /// The maximum output value possible to represent with the target precision
+    target_max: i128,
+}
+
+impl Decimal128Averager {
+    /// Create a new `Decimal128Averager`:
+    ///
+    /// * sum_scale: the scale of `sum` values passed to [`Self::avg`]
+    /// * target_precision: the output precision
+    /// * target_precision: the output scale
+    ///
+    /// Errors if the resulting data can not be stored
+    pub fn try_new(
+        sum_scale: i8,
+        target_precision: u8,
+        target_scale: i8,
+    ) -> Result<Self> {
+        let sum_mul = 10_i128.pow(sum_scale as u32);
+        let target_mul = 10_i128.pow(target_scale as u32);
+        let target_min = MIN_DECIMAL_FOR_EACH_PRECISION[target_precision as usize - 1];
+        let target_max = MAX_DECIMAL_FOR_EACH_PRECISION[target_precision as usize - 1];
+
+        if target_mul >= sum_mul {
+            Ok(Self {
+                sum_mul,
+                target_mul,
+                target_min,
+                target_max,
+            })
+        } else {
+            // can't convert the lit decimal to the returned data type
+            Err(DataFusionError::Execution(
+                "Arithmetic Overflow in AvgAccumulator".to_string(),
+            ))
+        }
+    }
+
+    /// Returns the `sum`/`count` as a i128 Decimal128 with
+    /// target_scale and target_precision and reporting overflow.
+    ///
+    /// * sum: The total sum value stored as Decimal128 with sum_scale
+    /// (passed to `Self::try_new`)
+    /// * count: total count, stored as a i128 (*NOT* a Decimal128 value)
+    #[inline(always)]
+    pub fn avg(&self, sum: i128, count: i128) -> Result<i128> {
+        if let Some(value) = sum.checked_mul(self.target_mul / self.sum_mul) {
+            let new_value = value / count;
+            if new_value >= self.target_min && new_value <= self.target_max {
+                Ok(new_value)
             } else {
-                // can't convert the lit decimal to the returned data type
                 Err(DataFusionError::Execution(
                     "Arithmetic Overflow in AvgAccumulator".to_string(),
                 ))
             }
+        } else {
+            // can't convert the lit decimal to the returned data type
+            Err(DataFusionError::Execution(
+                "Arithmetic Overflow in AvgAccumulator".to_string(),
+            ))
+        }
+    }
+}
+
+/// Returns `sum`/`count` for decimal values, detecting and reporting overflow.
+///
+/// * sum:  stored as Decimal128 with `sum_scale` scale
+/// * count: stored as a i128 (*NOT* a Decimal128 value)
+/// * sum_scale: the scale of `sum`
+/// * target_type: the output decimal type
+pub fn calculate_result_decimal_for_avg(
+    sum: i128,
+    count: i128,
+    sum_scale: i8,
+    target_type: &DataType,
+) -> Result<ScalarValue> {
+    match target_type {
+        DataType::Decimal128(target_precision, target_scale) => {
+            let new_value =
+                Decimal128Averager::try_new(sum_scale, *target_precision, *target_scale)?
+                    .avg(sum, count)?;
+
+            Ok(ScalarValue::Decimal128(
+                Some(new_value),
+                *target_precision,
+                *target_scale,
+            ))
         }
         other => Err(DataFusionError::Internal(format!(
             "Invalid target type in AvgAccumulator {other:?}"


[arrow-datafusion] 03/17: complete accumulator

Posted by dh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch hash_agg_spike
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 337353810df503d02245de02357fb1d6ba04f675
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Fri Jun 30 11:28:48 2023 -0400

    complete accumulator
---
 datafusion/physical-expr/src/aggregate/average.rs | 76 ++++++++++++++++++-----
 1 file changed, 61 insertions(+), 15 deletions(-)

diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs
index f81c704d8b..b23b555805 100644
--- a/datafusion/physical-expr/src/aggregate/average.rs
+++ b/datafusion/physical-expr/src/aggregate/average.rs
@@ -18,7 +18,7 @@
 //! Defines physical expressions that can evaluated at runtime during query execution
 
 use arrow::array::AsArray;
-use log::info;
+use log::debug;
 
 use std::any::Any;
 use std::convert::TryFrom;
@@ -45,6 +45,8 @@ use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::Accumulator;
 use datafusion_row::accessor::RowAccessor;
 
+use super::utils::Decimal128Averager;
+
 /// AVG aggregate expression
 #[derive(Debug, Clone)]
 pub struct Avg {
@@ -161,16 +163,29 @@ impl AggregateExpr for Avg {
 
     fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
         // instantiate specialized accumulator
-        match self.sum_data_type {
-            DataType::Decimal128(_, _) => {
-                Ok(Box::new(AvgGroupsAccumulator::<Decimal128Type>::new(
+        match (&self.sum_data_type, &self.rt_data_type) {
+            (
+                DataType::Decimal128(_sum_precision, sum_scale),
+                DataType::Decimal128(target_precision, target_scale),
+            ) => {
+                let decimal_averager = Decimal128Averager::try_new(
+                    *sum_scale,
+                    *target_precision,
+                    *target_scale,
+                )?;
+
+                let avg_fn =
+                    move |sum: i128, count: u64| decimal_averager.avg(sum, count as i128);
+
+                Ok(Box::new(AvgGroupsAccumulator::<Decimal128Type, _>::new(
                     &self.sum_data_type,
                     &self.rt_data_type,
+                    avg_fn,
                 )))
             }
             _ => Err(DataFusionError::NotImplemented(format!(
-                "AvgGroupsAccumulator for {}",
-                self.sum_data_type
+                "AvgGroupsAccumulator for ({} --> {})",
+                self.sum_data_type, self.rt_data_type,
             ))),
         }
     }
@@ -403,9 +418,13 @@ impl RowAccumulator for AvgRowAccumulator {
 }
 
 /// An accumulator to compute the average of PrimitiveArray<T>.
-/// Stores values as native types
+/// Stores values as native types, and does overflow checking
 #[derive(Debug)]
-struct AvgGroupsAccumulator<T: ArrowNumericType + Send> {
+struct AvgGroupsAccumulator<T, F>
+where
+    T: ArrowNumericType + Send,
+    F: Fn(T::Native, u64) -> Result<T::Native> + Send,
+{
     /// The type of the internal sum
     sum_data_type: DataType,
 
@@ -415,13 +434,20 @@ struct AvgGroupsAccumulator<T: ArrowNumericType + Send> {
     /// Count per group (use u64 to make UInt64Array)
     counts: Vec<u64>,
 
-    // Sums per group, stored as the native type
+    /// Sums per group, stored as the native type
     sums: Vec<T::Native>,
+
+    /// Function that computes the average (value / count)
+    avg_fn: F,
 }
 
-impl<T: ArrowNumericType + Send> AvgGroupsAccumulator<T> {
-    pub fn new(sum_data_type: &DataType, return_data_type: &DataType) -> Self {
-        info!(
+impl<T, F> AvgGroupsAccumulator<T, F>
+where
+    T: ArrowNumericType + Send,
+    F: Fn(T::Native, u64) -> Result<T::Native> + Send,
+{
+    pub fn new(sum_data_type: &DataType, return_data_type: &DataType, avg_fn: F) -> Self {
+        debug!(
             "AvgGroupsAccumulator ({}, sum type: {sum_data_type:?}) --> {return_data_type:?}",
             std::any::type_name::<T>()
         );
@@ -430,6 +456,7 @@ impl<T: ArrowNumericType + Send> AvgGroupsAccumulator<T> {
             sum_data_type: sum_data_type.clone(),
             counts: vec![],
             sums: vec![],
+            avg_fn,
         }
     }
 
@@ -500,7 +527,11 @@ impl<T: ArrowNumericType + Send> AvgGroupsAccumulator<T> {
     }
 }
 
-impl<T: ArrowNumericType + Send> GroupsAccumulator for AvgGroupsAccumulator<T> {
+impl<T, F> GroupsAccumulator for AvgGroupsAccumulator<T, F>
+where
+    T: ArrowNumericType + Send,
+    F: Fn(T::Native, u64) -> Result<T::Native> + Send,
+{
     fn update_batch(
         &mut self,
         values: &[ArrayRef],
@@ -549,7 +580,22 @@ impl<T: ArrowNumericType + Send> GroupsAccumulator for AvgGroupsAccumulator<T> {
     }
 
     fn evaluate(&mut self) -> Result<ArrayRef> {
-        todo!()
+        let counts = std::mem::take(&mut self.counts);
+        let sums = std::mem::take(&mut self.sums);
+
+        let averages: Vec<T::Native> = sums
+            .into_iter()
+            .zip(counts.into_iter())
+            .map(|(sum, count)| (self.avg_fn)(sum, count))
+            .collect::<Result<Vec<_>>>()?;
+
+        // TODO figure out how to do this without the iter / copy
+        let array = PrimitiveArray::<T>::from_iter_values(averages);
+
+        // fix up decimal precision and scale for decimals
+        let array = set_decimal_precision(&self.return_data_type, Arc::new(array))?;
+
+        Ok(array)
     }
 
     // return arrays for sums and counts
@@ -563,7 +609,7 @@ impl<T: ArrowNumericType + Send> GroupsAccumulator for AvgGroupsAccumulator<T> {
         // TODO figure out how to do this without the iter / copy
         let sums: PrimitiveArray<T> = PrimitiveArray::from_iter_values(sums);
 
-        // fix up decimal precision and scale
+        // fix up decimal precision and scale for decimals
         let sums = set_decimal_precision(&self.sum_data_type, Arc::new(sums))?;
 
         Ok(vec![


[arrow-datafusion] 17/17: WIP count

Posted by dh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch hash_agg_spike
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit bcd9e8ed568f41ff6a7555ee8fbedaa890fbefb5
Author: Daniël Heres <da...@coralogix.com>
AuthorDate: Mon Jul 3 09:57:24 2023 +0200

    WIP count
---
 datafusion/physical-expr/src/aggregate/count.rs | 238 +++++++++++++++++++++++-
 1 file changed, 236 insertions(+), 2 deletions(-)

diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs
index 22cb2512fc..c3ad7767b1 100644
--- a/datafusion/physical-expr/src/aggregate/count.rs
+++ b/datafusion/physical-expr/src/aggregate/count.rs
@@ -19,17 +19,23 @@
 
 use std::any::Any;
 use std::fmt::Debug;
+use std::marker::PhantomData;
 use std::ops::BitAnd;
 use std::sync::Arc;
 
 use crate::aggregate::row_accumulator::RowAccumulator;
 use crate::aggregate::utils::down_cast_any_ref;
-use crate::{AggregateExpr, PhysicalExpr};
+use crate::{AggregateExpr, PhysicalExpr, GroupsAccumulator};
 use arrow::array::{Array, Int64Array};
 use arrow::compute;
+use arrow::compute::kernels::cast;
 use arrow::datatypes::DataType;
 use arrow::{array::ArrayRef, datatypes::Field};
-use arrow_buffer::BooleanBuffer;
+use arrow_array::builder::PrimitiveBuilder;
+use arrow_array::cast::AsArray;
+use arrow_array::types::{UInt64Type, Int64Type, UInt32Type, Int32Type};
+use arrow_array::{PrimitiveArray, UInt64Array, ArrowNumericType};
+use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, NullBuffer};
 use datafusion_common::{downcast_value, ScalarValue};
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::Accumulator;
@@ -37,6 +43,8 @@ use datafusion_row::accessor::RowAccessor;
 
 use crate::expressions::format_state_name;
 
+use super::groups_accumulator::accumulate::{accumulate_all, accumulate_all_nullable};
+
 /// COUNT aggregate expression
 /// Returns the amount of non-null values of the given expression.
 #[derive(Debug, Clone)]
@@ -76,6 +84,200 @@ impl Count {
     }
 }
 
+/// An accumulator to compute the average of PrimitiveArray<T>.
+/// Stores values as native types, and does overflow checking
+///
+/// F: Function that calcuates the average value from a sum of
+/// T::Native and a total count
+#[derive(Debug)]
+struct CountGroupsAccumulator<T>
+where T: ArrowNumericType + Send,
+{
+    /// The type of the returned count
+    return_data_type: DataType,
+
+    /// Count per group (use u64 to make UInt64Array)
+    counts: Vec<u64>,
+
+    /// If we have seen a null input value for this group_index
+    null_inputs: BooleanBufferBuilder,
+
+    // Bind it to struct
+    phantom: PhantomData<T>
+}
+
+
+impl<T> CountGroupsAccumulator<T>
+where T: ArrowNumericType + Send,
+{
+    pub fn new(return_data_type: &DataType) -> Self {
+        Self {
+            return_data_type: return_data_type.clone(),
+            counts: vec![],
+            null_inputs: BooleanBufferBuilder::new(0),
+            phantom: PhantomData {}
+        }
+    }
+
+        /// Adds one to each group's counter
+        fn increment_counts(
+            &mut self,
+            group_indices: &[usize],
+            values: &PrimitiveArray<T>,
+            opt_filter: Option<&arrow_array::BooleanArray>,
+            total_num_groups: usize,
+        ) {
+            self.counts.resize(total_num_groups, 0);
+    
+            if values.null_count() == 0 {
+                accumulate_all(
+                    group_indices,
+                    values,
+                    opt_filter,
+                    |group_index, _new_value| {
+                        self.counts[group_index] += 1;
+                    }
+                )
+            }else {
+                accumulate_all_nullable(
+                    group_indices,
+                    values,
+                    opt_filter,
+                    |group_index, _new_value, is_valid| {
+                        if is_valid {
+                            self.counts[group_index] += 1;
+                        }
+                    },
+                )
+            }
+        }
+
+        /// Adds the counts with the partial counts
+        fn update_counts_with_partial_counts(
+            &mut self,
+            group_indices: &[usize],
+            partial_counts: &UInt64Array,
+            opt_filter: Option<&arrow_array::BooleanArray>,
+            total_num_groups: usize,
+        ) {
+            self.counts.resize(total_num_groups, 0);
+    
+            if partial_counts.null_count() == 0 {
+                accumulate_all(
+                    group_indices,
+                    partial_counts,
+                    opt_filter,
+                    |group_index, partial_count| {
+                        self.counts[group_index] += partial_count;
+                    },
+                )
+            } else {
+                accumulate_all_nullable(
+                    group_indices,
+                    partial_counts,
+                    opt_filter,
+                    |group_index, partial_count, is_valid| {
+                        if is_valid {
+                            self.counts[group_index] += partial_count;
+                        }
+                    },
+                )
+            }
+        }
+
+        /// Returns a NullBuffer representing which group_indices have
+        /// null values (if they saw a null input)
+        /// Resets `self.null_inputs`;
+        fn build_nulls(&mut self) -> Option<NullBuffer> {
+            let nulls = NullBuffer::new(self.null_inputs.finish());
+            if nulls.null_count() > 0 {
+                Some(nulls)
+            } else {
+                None
+            }
+        }
+}
+
+impl <T> GroupsAccumulator for CountGroupsAccumulator<T>
+where T: ArrowNumericType + Send
+{
+    fn update_batch(
+        &mut self,
+        values: &[ArrayRef],
+        group_indices: &[usize],
+        opt_filter: Option<&arrow_array::BooleanArray>,
+        total_num_groups: usize,
+    ) -> Result<()> {
+        assert_eq!(values.len(), 1, "single argument to update_batch");
+        let values = values.get(0).unwrap().as_primitive::<T>();
+
+        self.increment_counts(group_indices, values, opt_filter, total_num_groups);
+
+        Ok(())
+    }
+
+    fn merge_batch(
+        &mut self,
+        values: &[ArrayRef],
+        group_indices: &[usize],
+        opt_filter: Option<&arrow_array::BooleanArray>,
+        total_num_groups: usize,
+    ) -> Result<()> {
+        assert_eq!(values.len(), 1, "one argument to merge_batch");
+        // first batch is counts, second is partial sums
+        let partial_counts = values.get(0).unwrap().as_primitive::<UInt64Type>();
+        self.update_counts_with_partial_counts(
+            group_indices,
+            partial_counts,
+            opt_filter,
+            total_num_groups,
+        );
+
+        Ok(())
+    }
+
+    fn evaluate(&mut self) -> Result<ArrayRef> {
+        let counts = std::mem::take(&mut self.counts);
+        let nulls = self.build_nulls();
+
+        // don't evaluate averages with null inputs to avoid errors on null vaues
+        let array: PrimitiveArray<UInt64Type> = if let Some(nulls) = nulls.as_ref() {
+            let mut builder = PrimitiveBuilder::<UInt64Type>::with_capacity(nulls.len());
+            let iter = counts.into_iter().zip(nulls.iter());
+
+            for (count, is_valid) in iter {
+                if is_valid {
+                    builder.append_value(count)
+                } else {
+                    builder.append_null();
+                }
+            }
+            builder.finish()
+        } else {
+            PrimitiveArray::<UInt64Type>::new(counts.into(), nulls) // no copy
+        };
+        // TODO remove cast
+        let array = cast(&array, &self.return_data_type)?;
+
+        Ok(array)
+    }
+
+    // return arrays for sums and counts
+    fn state(&mut self) -> Result<Vec<ArrayRef>> {
+        // TODO nulls
+        let nulls = self.build_nulls();
+        let counts = std::mem::take(&mut self.counts);
+        let counts = UInt64Array::from(counts); // zero copy
+        Ok(vec![
+            Arc::new(counts) as ArrayRef,
+        ])
+    }
+
+    fn size(&self) -> usize {
+        self.counts.capacity() * std::mem::size_of::<usize>()
+    }
+}
+
 /// count null values for multiple columns
 /// for each row if one column value is null, then null_count + 1
 fn null_count_for_multiple_cols(values: &[ArrayRef]) -> usize {
@@ -147,6 +349,38 @@ impl AggregateExpr for Count {
     fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
         Ok(Box::new(CountAccumulator::new()))
     }
+
+    fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
+        // instantiate specialized accumulator
+        match &self.data_type {
+            DataType::UInt64 => {
+                Ok(Box::new(CountGroupsAccumulator::<UInt64Type>::new(
+                    &self.data_type,
+                )))
+            },
+                DataType::Int64 => {
+                Ok(Box::new(CountGroupsAccumulator::<Int64Type>::new(
+                    &self.data_type,
+                )))
+            },
+                DataType::UInt32 => {
+                Ok(Box::new(CountGroupsAccumulator::<UInt32Type>::new(
+                    &self.data_type,
+                )))
+            },
+                DataType::Int32 => {
+                Ok(Box::new(CountGroupsAccumulator::<Int32Type>::new(
+                    &self.data_type,
+                )))
+            }
+
+            _ => Err(DataFusionError::NotImplemented(format!(
+                "CountGroupsAccumulator not supported for {}",
+                self.data_type
+            ))),
+        }
+
+    }
 }
 
 impl PartialEq<dyn Any> for Count {


[arrow-datafusion] 07/17: factor out accumulate

Posted by dh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch hash_agg_spike
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 668063689250cd9a0a7883af8299c4d3bc17f1f1
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Sat Jul 1 05:24:40 2023 -0400

    factor out accumulate
---
 datafusion/physical-expr/src/aggregate/average.rs | 93 ++++++++++++++++++++++-
 1 file changed, 90 insertions(+), 3 deletions(-)

diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs
index 7043ed9ce1..0dcff7ec9b 100644
--- a/datafusion/physical-expr/src/aggregate/average.rs
+++ b/datafusion/physical-expr/src/aggregate/average.rs
@@ -417,8 +417,95 @@ impl RowAccumulator for AvgRowAccumulator {
     }
 }
 
+/// This function is called once per row to update the accumulator,
+/// for a `PrimitiveArray<T>` and is the inner loop for many
+/// GroupsAccumulators and thus performance critical.
+///
+/// * `values`: the input arguments to the accumulator
+/// * `group_indices`:  To which groups do the rows in `values` belong, group id)
+/// * `opt_filter`: if present, only update aggregate state using values[i] if opt_filter[i] is true
+///
+/// `F`: The function to invoke for a non null input row to update the
+/// accumulator state. Called like `value_fn(group_index, value)
+///
+/// `FN`: The function to call for each null input row.  Called like
+/// `null_fn(group_index)
+fn accumulate_all<T, F, FN>(
+    values: &PrimitiveArray<T>,
+    group_indicies: &[usize],
+    opt_filter: Option<&arrow_array::BooleanArray>,
+    value_fn: F,
+    null_fn: FN,
+) where
+    T: ArrowNumericType + Send,
+    F: Fn(usize, T::Native) + Send,
+    FN: Fn(usize) + Send,
+{
+    // AAL TODO handle filter values
+    // TODO combine the null mask from values and opt_filter
+    let valids = values.nulls();
+
+    // This is based on (ahem, COPY/PASTA) arrow::compute::aggregate::sum
+    let data: &[T::Native] = values.values();
+
+    match valids {
+        // no nulls
+        None => {
+            let iter = group_indicies.iter().zip(data.iter());
+            for (&group_index, &new_value) in iter {
+                value_fn(group_index, new_value)
+            }
+        }
+        // there are nulls, so handle them specially
+        Some(valids) => {
+            let group_indices_chunks = group_indicies.chunks_exact(64);
+            let data_chunks = data.chunks_exact(64);
+            let bit_chunks = valids.inner().bit_chunks();
+
+            let group_indices_remainder = group_indices_chunks.remainder();
+            let data_remainder = data_chunks.remainder();
+
+            group_indices_chunks
+                .zip(data_chunks)
+                .zip(bit_chunks.iter())
+                .for_each(|((group_index_chunk, data_chunk), mask)| {
+                    // index_mask has value 1 << i in the loop
+                    let mut index_mask = 1;
+                    group_index_chunk.iter().zip(data_chunk.iter()).for_each(
+                        |(&group_index, &new_value)| {
+                            // valid bit was set, real vale
+                            if (mask & index_mask) != 0 {
+                                value_fn(group_index, new_value);
+                            } else {
+                                null_fn(group_index)
+                            }
+                            index_mask <<= 1;
+                        },
+                    )
+                });
+
+            // handle any remaining bits (after the intial 64)
+            let remainder_bits = bit_chunks.remainder_bits();
+            group_indices_remainder
+                .iter()
+                .zip(data_remainder.iter())
+                .enumerate()
+                .for_each(|(i, (&group_index, &new_value))| {
+                    if remainder_bits & (1 << i) != 0 {
+                        value_fn(group_index, new_value)
+                    } else {
+                        null_fn(group_index)
+                    }
+                });
+        }
+    }
+}
+
 /// An accumulator to compute the average of PrimitiveArray<T>.
 /// Stores values as native types, and does overflow checking
+///
+/// F: Function that calcuates the average value from a sum of
+/// T::Native and a total count
 #[derive(Debug)]
 struct AvgGroupsAccumulator<T, F>
 where
@@ -597,7 +684,7 @@ where
         let array = PrimitiveArray::<T>::from_iter_values(averages);
 
         // fix up decimal precision and scale for decimals
-        let array = set_decimal_precision(&self.return_data_type, Arc::new(array))?;
+        let array = adjust_output_array(&self.return_data_type, Arc::new(array))?;
 
         Ok(array)
     }
@@ -614,7 +701,7 @@ where
         let sums: PrimitiveArray<T> = PrimitiveArray::from_iter_values(sums);
 
         // fix up decimal precision and scale for decimals
-        let sums = set_decimal_precision(&self.sum_data_type, Arc::new(sums))?;
+        let sums = adjust_output_array(&self.sum_data_type, Arc::new(sums))?;
 
         Ok(vec![
             Arc::new(counts) as ArrayRef,
@@ -631,7 +718,7 @@ where
 ///
 /// Decimal128Arrays are are are created from Vec<NativeType> with default
 /// precision and scale. This function adjusts them down.
-fn set_decimal_precision(sum_data_type: &DataType, array: ArrayRef) -> Result<ArrayRef> {
+fn adjust_output_array(sum_data_type: &DataType, array: ArrayRef) -> Result<ArrayRef> {
     let array = match sum_data_type {
         DataType::Decimal128(p, s) => Arc::new(
             array


[arrow-datafusion] 13/17: more tets

Posted by dh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch hash_agg_spike
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 83ff9cf83ea8cb137c6ed469f38039d29816e276
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Sat Jul 1 07:12:59 2023 -0400

    more tets
---
 .../src/aggregate/groups_accumulator/accumulate.rs | 56 +++++++++++++++++-----
 1 file changed, 45 insertions(+), 11 deletions(-)

diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs
index f30bed47a6..26baad723a 100644
--- a/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs
+++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs
@@ -17,7 +17,7 @@
 
 //! Vectorized [`accumulate`] and [`accumulate_nullable`] functions
 
-use arrow_array::{Array, ArrowNumericType, PrimitiveArray};
+use arrow_array::{Array, ArrowNumericType, BooleanArray, PrimitiveArray};
 
 /// This function is used to update the accumulator state per row,
 /// for a `PrimitiveArray<T>` with no nulls. It is the inner loop for
@@ -68,7 +68,7 @@ use arrow_array::{Array, ArrowNumericType, PrimitiveArray};
 pub fn accumulate_all<T, F>(
     group_indicies: &[usize],
     values: &PrimitiveArray<T>,
-    opt_filter: Option<&arrow_array::BooleanArray>,
+    opt_filter: Option<&BooleanArray>,
     mut value_fn: F,
 ) where
     T: ArrowNumericType + Send,
@@ -99,7 +99,7 @@ pub fn accumulate_all<T, F>(
 pub fn accumulate_all_nullable<T, F>(
     group_indicies: &[usize],
     values: &PrimitiveArray<T>,
-    opt_filter: Option<&arrow_array::BooleanArray>,
+    opt_filter: Option<&BooleanArray>,
     mut value_fn: F,
 ) where
     T: ArrowNumericType + Send,
@@ -156,15 +156,14 @@ mod test {
     use arrow_array::UInt32Array;
 
     #[test]
-    fn no_nulls_no_filter() {
+    fn accumulate_no_filter() {
         let fixture = Fixture::new();
-        let opt_filter = None;
         let mut accumulated = vec![];
 
         accumulate_all(
             &fixture.group_indices,
             &fixture.values_array(),
-            opt_filter,
+            fixture.opt_filter(),
             |group_index, value| accumulated.push((group_index, value)),
         );
 
@@ -179,15 +178,29 @@ mod test {
     }
 
     #[test]
-    fn nulls_no_filter() {
+    #[should_panic(
+        expected = "assertion failed: `(left == right)`\n  left: `34`,\n right: `0`: Called accumulate_all with nullable array (call accumulate_all_nullable instead)"
+    )]
+    fn accumulate_with_nullable_panics() {
+        let fixture = Fixture::new();
+        // call with an array that has nulls should panic
+        accumulate_all(
+            &fixture.group_indices,
+            &fixture.values_with_nulls_array(),
+            fixture.opt_filter(),
+            |_, _| {},
+        );
+    }
+
+    #[test]
+    fn accumulate_nullable_no_filter() {
         let fixture = Fixture::new();
-        let opt_filter = None;
         let mut accumulated = vec![];
 
         accumulate_all_nullable(
             &fixture.group_indices,
             &fixture.values_with_nulls_array(),
-            opt_filter,
+            fixture.opt_filter(),
             |group_index, value, is_valid| {
                 let value = if is_valid { Some(value) } else { None };
                 accumulated.push((group_index, value));
@@ -204,9 +217,22 @@ mod test {
             })
     }
 
-    // TODO: filter testing with/without null
+    #[test]
+    #[should_panic(
+        expected = "Called accumulate_all_nullable with non-nullable array (call accumulate_all instead)"
+    )]
+    fn accumulate_nullable_with_non_nullable_panics() {
+        let fixture = Fixture::new();
+        // call with an array that has nulls should panic
+        accumulate_all_nullable(
+            &fixture.group_indices,
+            &fixture.values_array(),
+            fixture.opt_filter(),
+            |_, _, _| {},
+        );
+    }
 
-    // TODO: calling nulls/nonulls with wrong one panics
+    // TODO: filter testing with/without null
 
     // fuzz testing
 
@@ -221,6 +247,9 @@ mod test {
         /// same as values, but every third is null:
         /// None, Some(20), Some(30), None ...
         values_with_nulls: Vec<Option<u32>>,
+
+        /// Optional filter (defaults to None)
+        opt_filter: Option<BooleanArray>,
     }
 
     impl Fixture {
@@ -231,6 +260,7 @@ mod test {
                 values_with_nulls: (0..100)
                     .map(|i| if i % 3 == 0 { None } else { Some((i + 1) * 10) })
                     .collect(),
+                opt_filter: None,
             }
         }
 
@@ -243,5 +273,9 @@ mod test {
         fn values_with_nulls_array(&self) -> UInt32Array {
             UInt32Array::from(self.values_with_nulls.clone())
         }
+
+        fn opt_filter(&self) -> Option<&BooleanArray> {
+            self.opt_filter.as_ref()
+        }
     }
 }


[arrow-datafusion] 12/17: Begin writing tests for accumulate

Posted by dh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch hash_agg_spike
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 190c43c14814612a7aa334b63e0ff521f03c3160
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Sat Jul 1 07:04:49 2023 -0400

    Begin writing tests for accumulate
---
 .../src/aggregate/groups_accumulator/accumulate.rs | 94 +++++++++++++++++++++-
 1 file changed, 92 insertions(+), 2 deletions(-)

diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs
index f8a6791def..f30bed47a6 100644
--- a/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs
+++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs
@@ -34,7 +34,7 @@ use arrow_array::{Array, ArrowNumericType, PrimitiveArray};
 ///
 /// # Example
 ///
-/// ```
+/// ```text
 ///  ┌─────────┐   ┌─────────┐   ┌ ─ ─ ─ ─ ┐
 ///  │ ┌─────┐ │   │ ┌─────┐ │     ┌─────┐
 ///  │ │  2  │ │   │ │ 200 │ │   │ │  t  │ │
@@ -151,7 +151,97 @@ pub fn accumulate_all_nullable<T, F>(
 
 #[cfg(test)]
 mod test {
+    use super::*;
+
+    use arrow_array::UInt32Array;
 
     #[test]
-    fn basic() {}
+    fn no_nulls_no_filter() {
+        let fixture = Fixture::new();
+        let opt_filter = None;
+        let mut accumulated = vec![];
+
+        accumulate_all(
+            &fixture.group_indices,
+            &fixture.values_array(),
+            opt_filter,
+            |group_index, value| accumulated.push((group_index, value)),
+        );
+
+        // Should have see all indexes and values in order
+        accumulated
+            .into_iter()
+            .enumerate()
+            .for_each(|(i, (group_index, value))| {
+                assert_eq!(group_index, fixture.group_indices[i]);
+                assert_eq!(value, fixture.values[i]);
+            })
+    }
+
+    #[test]
+    fn nulls_no_filter() {
+        let fixture = Fixture::new();
+        let opt_filter = None;
+        let mut accumulated = vec![];
+
+        accumulate_all_nullable(
+            &fixture.group_indices,
+            &fixture.values_with_nulls_array(),
+            opt_filter,
+            |group_index, value, is_valid| {
+                let value = if is_valid { Some(value) } else { None };
+                accumulated.push((group_index, value));
+            },
+        );
+
+        // Should have see all indexes and values in order
+        accumulated
+            .into_iter()
+            .enumerate()
+            .for_each(|(i, (group_index, value))| {
+                assert_eq!(group_index, fixture.group_indices[i]);
+                assert_eq!(value, fixture.values_with_nulls[i]);
+            })
+    }
+
+    // TODO: filter testing with/without null
+
+    // TODO: calling nulls/nonulls with wrong one panics
+
+    // fuzz testing
+
+    /// Values for testing (there are enough values to exercise the 64 bit chunks
+    struct Fixture {
+        /// 100..0
+        group_indices: Vec<usize>,
+
+        /// 10, 20, ... 1010
+        values: Vec<u32>,
+
+        /// same as values, but every third is null:
+        /// None, Some(20), Some(30), None ...
+        values_with_nulls: Vec<Option<u32>>,
+    }
+
+    impl Fixture {
+        fn new() -> Self {
+            Self {
+                group_indices: (0..100).collect(),
+                values: (0..100).map(|i| (i + 1) * 10).collect(),
+                values_with_nulls: (0..100)
+                    .map(|i| if i % 3 == 0 { None } else { Some((i + 1) * 10) })
+                    .collect(),
+            }
+        }
+
+        /// returns `Self::values` an Array
+        fn values_array(&self) -> UInt32Array {
+            UInt32Array::from(self.values.clone())
+        }
+
+        /// returns `Self::values_with_nulls` as an Array
+        fn values_with_nulls_array(&self) -> UInt32Array {
+            UInt32Array::from(self.values_with_nulls.clone())
+        }
+    }
 }


[arrow-datafusion] 14/17: more tests

Posted by dh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch hash_agg_spike
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 889345c44647096a81f5b3349cb09d584e48c638
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Sat Jul 1 07:14:54 2023 -0400

    more tests
---
 .../physical-expr/src/aggregate/groups_accumulator/accumulate.rs       | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs
index 26baad723a..24e4b04661 100644
--- a/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs
+++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs
@@ -82,6 +82,8 @@ pub fn accumulate_all<T, F>(
     // AAL TODO handle filter values
 
     let data: &[T::Native] = values.values();
+    assert_eq!(data.len(), group_indicies.len());
+
     let iter = group_indicies.iter().zip(data.iter());
     for (&group_index, &new_value) in iter {
         value_fn(group_index, new_value)
@@ -113,6 +115,7 @@ pub fn accumulate_all_nullable<T, F>(
 
     // This is based on (ahem, COPY/PASTA) arrow::compute::aggregate::sum
     let data: &[T::Native] = values.values();
+    assert_eq!(data.len(), group_indicies.len());
 
     let group_indices_chunks = group_indicies.chunks_exact(64);
     let data_chunks = data.chunks_exact(64);