You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/29 11:10:06 UTC

[GitHub] [arrow-datafusion] yjshen opened a new pull request, #2375: WIP: Use row format for aggregate

yjshen opened a new pull request, #2375:
URL: https://github.com/apache/arrow-datafusion/pull/2375

   **The current PR seems scary in size, maybe I should move the physical_plan folder re-org as a separate PR first.**
   
   
   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #.
   
    # Rationale for this change
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   # What changes are included in this PR?
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   # Are there any user-facing changes?
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yjshen commented on a diff in pull request #2375: Grouped Aggregate in row format

Posted by GitBox <gi...@apache.org>.
yjshen commented on code in PR #2375:
URL: https://github.com/apache/arrow-datafusion/pull/2375#discussion_r866978192


##########
datafusion/physical-expr/src/aggregate/sum.rs:
##########
@@ -315,6 +354,84 @@ pub(crate) fn sum(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
     })
 }
 
+pub(crate) fn add_to_row(
+    dt: &DataType,
+    index: usize,
+    accessor: &mut RowAccessor,
+    s: &ScalarValue,
+) -> Result<()> {
+    match (dt, s) {
+        // float64 coerces everything to f64

Review Comment:
   Agree. We should clean this up by probably by checking type coercions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yjshen commented on a diff in pull request #2375: Grouped Aggregate in row format

Posted by GitBox <gi...@apache.org>.
yjshen commented on code in PR #2375:
URL: https://github.com/apache/arrow-datafusion/pull/2375#discussion_r866742045


##########
datafusion/physical-expr/src/aggregate/sum.rs:
##########
@@ -144,7 +173,8 @@ fn sum_decimal_batch(
 }
 
 // sums the array and returns a ScalarValue of its corresponding type.
-pub(crate) fn sum_batch(values: &ArrayRef) -> Result<ScalarValue> {
+pub(crate) fn sum_batch(values: &ArrayRef, sum_type: &DataType) -> Result<ScalarValue> {
+    let values = &cast(values, sum_type)?;

Review Comment:
   This is the partial fix for #2455. We should cast the input array to sum result datatype first to alleviate the possibility of overflow. Further, we should have a wrapping sum kernel as well as a try_sum kernel to produce wrapped results or nulls in the case of overflow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yjshen commented on pull request #2375: Grouped Aggregate in row format

Posted by GitBox <gi...@apache.org>.
yjshen commented on PR #2375:
URL: https://github.com/apache/arrow-datafusion/pull/2375#issuecomment-1119257857

   Thanks @alamb for the detailed review ❤️, I'll try to answer or fix them today.
   
   For the benchmark you mentioned, I do have benchmarked it with existing aggregate_query_sql with a newly added case:
   
   The results are:
   
   ## The master branch
   
   ## This PR
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yjshen commented on a diff in pull request #2375: Grouped Aggregate in row format

Posted by GitBox <gi...@apache.org>.
yjshen commented on code in PR #2375:
URL: https://github.com/apache/arrow-datafusion/pull/2375#discussion_r866926702


##########
datafusion/physical-expr/src/aggregate/sum.rs:
##########
@@ -338,6 +455,42 @@ impl Accumulator for SumAccumulator {
     }
 }
 
+#[derive(Debug)]
+struct SumAccumulatorV2 {
+    index: usize,
+    datatype: DataType,
+}
+
+impl SumAccumulatorV2 {
+    pub fn new(index: usize, datatype: DataType) -> Self {
+        Self { index, datatype }
+    }
+}
+
+impl AccumulatorV2 for SumAccumulatorV2 {
+    fn update_batch(
+        &mut self,
+        values: &[ArrayRef],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        let values = &values[0];
+        add_to_row(&self.datatype, self.index, accessor, &sum_batch(values)?)?;

Review Comment:
   I initially use `sum_batch` here mainly to reduce code duplication with that of `SumAccumulator`, besides, there's a decimal `sum_batch` that isn't included in the compute kernel yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove merged pull request #2375: Grouped Aggregate in row format

Posted by GitBox <gi...@apache.org>.
andygrove merged PR #2375:
URL: https://github.com/apache/arrow-datafusion/pull/2375


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2375: Grouped Aggregate in row format

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2375:
URL: https://github.com/apache/arrow-datafusion/pull/2375#discussion_r867037033


##########
datafusion/physical-expr/src/aggregate/sum.rs:
##########
@@ -144,7 +173,8 @@ fn sum_decimal_batch(
 }
 
 // sums the array and returns a ScalarValue of its corresponding type.
-pub(crate) fn sum_batch(values: &ArrayRef) -> Result<ScalarValue> {
+pub(crate) fn sum_batch(values: &ArrayRef, sum_type: &DataType) -> Result<ScalarValue> {
+    let values = &cast(values, sum_type)?;

Review Comment:
   I also wonder if we could internally consider summing smaller integers using u128  and then detecting overflow at the end. 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2375: Grouped Aggregate in row format

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2375:
URL: https://github.com/apache/arrow-datafusion/pull/2375#discussion_r866315598


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -0,0 +1,472 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Hash aggregation through row format
+
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::vec;
+
+use ahash::RandomState;
+use futures::{
+    ready,
+    stream::{Stream, StreamExt},
+};
+
+use crate::error::Result;
+use crate::physical_plan::aggregates::{
+    evaluate, evaluate_many, group_schema, AccumulatorItemV2, AggregateMode,
+};
+use crate::physical_plan::hash_utils::create_row_hashes;
+use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
+use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
+use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
+
+use arrow::compute::cast;
+use arrow::datatypes::Schema;
+use arrow::{array::ArrayRef, compute};
+use arrow::{
+    array::{Array, UInt32Builder},
+    error::{ArrowError, Result as ArrowResult},
+};
+use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
+use datafusion_common::ScalarValue;
+use datafusion_row::accessor::RowAccessor;
+use datafusion_row::layout::RowLayout;
+use datafusion_row::reader::{read_row, RowReader};
+use datafusion_row::writer::{write_row, RowWriter};
+use datafusion_row::{MutableRecordBatch, RowType};
+use hashbrown::raw::RawTable;
+
+/// Grouping aggregate with row format to store the aggregation state.
+///
+/// The Architecture is similar to that in [`super::GroupedHashAggregateStream`] but use
+/// row format inside the HashTable to store aggregation buffers.
+pub(crate) struct GroupedHashAggregateStreamV2 {
+    schema: SchemaRef,
+    input: SendableRecordBatchStream,
+    mode: AggregateMode,
+    aggr_state: AggregationState,
+    aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
+
+    group_expr: Vec<Arc<dyn PhysicalExpr>>,
+    accumulators: Vec<AccumulatorItemV2>,
+
+    group_schema: SchemaRef,
+    aggr_schema: SchemaRef,
+    aggr_layout: RowLayout,
+
+    baseline_metrics: BaselineMetrics,
+    random_state: RandomState,
+    finished: bool,
+}
+
+fn aggr_state_schema(aggr_expr: &[Arc<dyn AggregateExpr>]) -> Result<SchemaRef> {
+    let fields = aggr_expr
+        .iter()
+        .flat_map(|expr| expr.state_fields().unwrap().into_iter())
+        .collect::<Vec<_>>();
+    Ok(Arc::new(Schema::new(fields)))
+}
+
+impl GroupedHashAggregateStreamV2 {
+    /// Create a new GroupedRowHashAggregateStream
+    pub fn new(
+        mode: AggregateMode,
+        schema: SchemaRef,
+        group_expr: Vec<Arc<dyn PhysicalExpr>>,
+        aggr_expr: Vec<Arc<dyn AggregateExpr>>,
+        input: SendableRecordBatchStream,
+        baseline_metrics: BaselineMetrics,
+    ) -> Result<Self> {
+        let timer = baseline_metrics.elapsed_compute().timer();
+
+        // The expressions to evaluate the batch, one vec of expressions per aggregation.
+        // Assume create_schema() always put group columns in front of aggr columns, we set
+        // col_idx_base to group expression count.
+        let aggregate_expressions =
+            aggregates::aggregate_expressions(&aggr_expr, &mode, group_expr.len())?;
+
+        let accumulators = aggregates::create_accumulators_v2(&aggr_expr)?;
+
+        let group_schema = group_schema(&schema, group_expr.len());
+        let aggr_schema = aggr_state_schema(&aggr_expr)?;
+
+        let aggr_layout = RowLayout::new(&aggr_schema, RowType::WordAligned);
+        timer.done();
+
+        Ok(Self {
+            schema,
+            mode,
+            input,
+            group_expr,
+            accumulators,
+            group_schema,
+            aggr_schema,
+            aggr_layout,
+            baseline_metrics,
+            aggregate_expressions,
+            aggr_state: Default::default(),
+            random_state: Default::default(),
+            finished: false,
+        })
+    }
+}
+
+impl Stream for GroupedHashAggregateStreamV2 {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        let this = &mut *self;
+        if this.finished {
+            return Poll::Ready(None);
+        }
+
+        let elapsed_compute = this.baseline_metrics.elapsed_compute();
+
+        loop {
+            let result = match ready!(this.input.poll_next_unpin(cx)) {
+                Some(Ok(batch)) => {
+                    let timer = elapsed_compute.timer();
+                    let result = group_aggregate_batch(
+                        &this.mode,
+                        &this.random_state,
+                        &this.group_expr,
+                        &mut this.accumulators,
+                        &this.group_schema,
+                        &this.aggr_layout,
+                        batch,
+                        &mut this.aggr_state,
+                        &this.aggregate_expressions,
+                    );
+
+                    timer.done();
+
+                    match result {
+                        Ok(_) => continue,
+                        Err(e) => Err(ArrowError::ExternalError(Box::new(e))),
+                    }
+                }
+                Some(Err(e)) => Err(e),
+                None => {
+                    this.finished = true;
+                    let timer = this.baseline_metrics.elapsed_compute().timer();
+                    let result = create_batch_from_map(

Review Comment:
   This code effectively  makes one massive output record batch -- I think it is also what `GroupedHashAggregateStream` does but it would be better in my opinion to stream the output (aka respect the `batch_size` configuration. Maybe we can file a ticket to do so



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -0,0 +1,472 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Hash aggregation through row format
+
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::vec;
+
+use ahash::RandomState;
+use futures::{
+    ready,
+    stream::{Stream, StreamExt},
+};
+
+use crate::error::Result;
+use crate::physical_plan::aggregates::{
+    evaluate, evaluate_many, group_schema, AccumulatorItemV2, AggregateMode,
+};
+use crate::physical_plan::hash_utils::create_row_hashes;
+use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
+use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
+use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
+
+use arrow::compute::cast;
+use arrow::datatypes::Schema;
+use arrow::{array::ArrayRef, compute};
+use arrow::{
+    array::{Array, UInt32Builder},
+    error::{ArrowError, Result as ArrowResult},
+};
+use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
+use datafusion_common::ScalarValue;
+use datafusion_row::accessor::RowAccessor;
+use datafusion_row::layout::RowLayout;
+use datafusion_row::reader::{read_row, RowReader};
+use datafusion_row::writer::{write_row, RowWriter};
+use datafusion_row::{MutableRecordBatch, RowType};
+use hashbrown::raw::RawTable;
+
+/// Grouping aggregate with row format to store the aggregation state.
+///
+/// The Architecture is similar to that in [`super::GroupedHashAggregateStream`] but use
+/// row format inside the HashTable to store aggregation buffers.
+pub(crate) struct GroupedHashAggregateStreamV2 {
+    schema: SchemaRef,
+    input: SendableRecordBatchStream,
+    mode: AggregateMode,
+    aggr_state: AggregationState,
+    aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
+
+    group_expr: Vec<Arc<dyn PhysicalExpr>>,
+    accumulators: Vec<AccumulatorItemV2>,
+
+    group_schema: SchemaRef,
+    aggr_schema: SchemaRef,
+    aggr_layout: RowLayout,
+
+    baseline_metrics: BaselineMetrics,
+    random_state: RandomState,
+    finished: bool,
+}
+
+fn aggr_state_schema(aggr_expr: &[Arc<dyn AggregateExpr>]) -> Result<SchemaRef> {
+    let fields = aggr_expr
+        .iter()
+        .flat_map(|expr| expr.state_fields().unwrap().into_iter())
+        .collect::<Vec<_>>();
+    Ok(Arc::new(Schema::new(fields)))
+}
+
+impl GroupedHashAggregateStreamV2 {
+    /// Create a new GroupedRowHashAggregateStream
+    pub fn new(
+        mode: AggregateMode,
+        schema: SchemaRef,
+        group_expr: Vec<Arc<dyn PhysicalExpr>>,
+        aggr_expr: Vec<Arc<dyn AggregateExpr>>,
+        input: SendableRecordBatchStream,
+        baseline_metrics: BaselineMetrics,
+    ) -> Result<Self> {
+        let timer = baseline_metrics.elapsed_compute().timer();
+
+        // The expressions to evaluate the batch, one vec of expressions per aggregation.
+        // Assume create_schema() always put group columns in front of aggr columns, we set
+        // col_idx_base to group expression count.
+        let aggregate_expressions =
+            aggregates::aggregate_expressions(&aggr_expr, &mode, group_expr.len())?;
+
+        let accumulators = aggregates::create_accumulators_v2(&aggr_expr)?;
+
+        let group_schema = group_schema(&schema, group_expr.len());
+        let aggr_schema = aggr_state_schema(&aggr_expr)?;
+
+        let aggr_layout = RowLayout::new(&aggr_schema, RowType::WordAligned);
+        timer.done();
+
+        Ok(Self {
+            schema,
+            mode,
+            input,
+            group_expr,
+            accumulators,
+            group_schema,
+            aggr_schema,
+            aggr_layout,
+            baseline_metrics,
+            aggregate_expressions,
+            aggr_state: Default::default(),
+            random_state: Default::default(),
+            finished: false,
+        })
+    }
+}
+
+impl Stream for GroupedHashAggregateStreamV2 {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        let this = &mut *self;
+        if this.finished {
+            return Poll::Ready(None);
+        }
+
+        let elapsed_compute = this.baseline_metrics.elapsed_compute();
+
+        loop {
+            let result = match ready!(this.input.poll_next_unpin(cx)) {
+                Some(Ok(batch)) => {
+                    let timer = elapsed_compute.timer();
+                    let result = group_aggregate_batch(
+                        &this.mode,
+                        &this.random_state,
+                        &this.group_expr,
+                        &mut this.accumulators,
+                        &this.group_schema,
+                        &this.aggr_layout,
+                        batch,
+                        &mut this.aggr_state,
+                        &this.aggregate_expressions,
+                    );
+
+                    timer.done();
+
+                    match result {
+                        Ok(_) => continue,
+                        Err(e) => Err(ArrowError::ExternalError(Box::new(e))),
+                    }
+                }
+                Some(Err(e)) => Err(e),
+                None => {
+                    this.finished = true;
+                    let timer = this.baseline_metrics.elapsed_compute().timer();
+                    let result = create_batch_from_map(
+                        &this.mode,
+                        &this.group_schema,
+                        &this.aggr_schema,
+                        &mut this.aggr_state,
+                        &mut this.accumulators,
+                        &this.schema,
+                    )
+                    .record_output(&this.baseline_metrics);
+
+                    timer.done();
+                    result
+                }
+            };
+
+            this.finished = true;
+            return Poll::Ready(Some(result));
+        }
+    }
+}
+
+impl RecordBatchStream for GroupedHashAggregateStreamV2 {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
+
+/// TODO: Make this a member function of [`GroupedHashAggregateStreamV2`]

Review Comment:
   👍 



##########
datafusion/physical-expr/src/aggregate/sum.rs:
##########
@@ -315,6 +354,84 @@ pub(crate) fn sum(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
     })
 }
 
+pub(crate) fn add_to_row(
+    dt: &DataType,
+    index: usize,
+    accessor: &mut RowAccessor,
+    s: &ScalarValue,
+) -> Result<()> {
+    match (dt, s) {
+        // float64 coerces everything to f64

Review Comment:
   🤔 I almost wonder how valuable supporting all these types are -- like I wonder if we can use `u64` or `i64` accumulators for all integer types and `f64` for floats and reduce the code. I don't think this PR is making things any better or worse, but it just seems like these type `match` statements are so common and repetitive



##########
datafusion/physical-expr/src/aggregate/sum.rs:
##########
@@ -338,6 +455,42 @@ impl Accumulator for SumAccumulator {
     }
 }
 
+#[derive(Debug)]
+struct SumAccumulatorV2 {
+    index: usize,

Review Comment:
   I think it would help to document what this is an `index` into -- aka document what the parameters are



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -0,0 +1,472 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Hash aggregation through row format
+
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::vec;
+
+use ahash::RandomState;
+use futures::{
+    ready,
+    stream::{Stream, StreamExt},
+};
+
+use crate::error::Result;
+use crate::physical_plan::aggregates::{
+    evaluate, evaluate_many, group_schema, AccumulatorItemV2, AggregateMode,
+};
+use crate::physical_plan::hash_utils::create_row_hashes;
+use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
+use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
+use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
+
+use arrow::compute::cast;
+use arrow::datatypes::Schema;
+use arrow::{array::ArrayRef, compute};
+use arrow::{
+    array::{Array, UInt32Builder},
+    error::{ArrowError, Result as ArrowResult},
+};
+use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
+use datafusion_common::ScalarValue;
+use datafusion_row::accessor::RowAccessor;
+use datafusion_row::layout::RowLayout;
+use datafusion_row::reader::{read_row, RowReader};
+use datafusion_row::writer::{write_row, RowWriter};
+use datafusion_row::{MutableRecordBatch, RowType};
+use hashbrown::raw::RawTable;
+
+/// Grouping aggregate with row format to store the aggregation state.
+///
+/// The Architecture is similar to that in [`super::GroupedHashAggregateStream`] but use
+/// row format inside the HashTable to store aggregation buffers.
+pub(crate) struct GroupedHashAggregateStreamV2 {
+    schema: SchemaRef,
+    input: SendableRecordBatchStream,
+    mode: AggregateMode,
+    aggr_state: AggregationState,
+    aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
+
+    group_expr: Vec<Arc<dyn PhysicalExpr>>,
+    accumulators: Vec<AccumulatorItemV2>,
+
+    group_schema: SchemaRef,
+    aggr_schema: SchemaRef,
+    aggr_layout: RowLayout,
+
+    baseline_metrics: BaselineMetrics,
+    random_state: RandomState,
+    finished: bool,
+}
+
+fn aggr_state_schema(aggr_expr: &[Arc<dyn AggregateExpr>]) -> Result<SchemaRef> {
+    let fields = aggr_expr
+        .iter()
+        .flat_map(|expr| expr.state_fields().unwrap().into_iter())
+        .collect::<Vec<_>>();
+    Ok(Arc::new(Schema::new(fields)))
+}
+
+impl GroupedHashAggregateStreamV2 {
+    /// Create a new GroupedRowHashAggregateStream
+    pub fn new(
+        mode: AggregateMode,
+        schema: SchemaRef,
+        group_expr: Vec<Arc<dyn PhysicalExpr>>,
+        aggr_expr: Vec<Arc<dyn AggregateExpr>>,
+        input: SendableRecordBatchStream,
+        baseline_metrics: BaselineMetrics,
+    ) -> Result<Self> {
+        let timer = baseline_metrics.elapsed_compute().timer();
+
+        // The expressions to evaluate the batch, one vec of expressions per aggregation.
+        // Assume create_schema() always put group columns in front of aggr columns, we set
+        // col_idx_base to group expression count.
+        let aggregate_expressions =
+            aggregates::aggregate_expressions(&aggr_expr, &mode, group_expr.len())?;
+
+        let accumulators = aggregates::create_accumulators_v2(&aggr_expr)?;
+
+        let group_schema = group_schema(&schema, group_expr.len());
+        let aggr_schema = aggr_state_schema(&aggr_expr)?;
+
+        let aggr_layout = RowLayout::new(&aggr_schema, RowType::WordAligned);
+        timer.done();
+
+        Ok(Self {
+            schema,
+            mode,
+            input,
+            group_expr,
+            accumulators,
+            group_schema,
+            aggr_schema,
+            aggr_layout,
+            baseline_metrics,
+            aggregate_expressions,
+            aggr_state: Default::default(),
+            random_state: Default::default(),
+            finished: false,
+        })
+    }
+}
+
+impl Stream for GroupedHashAggregateStreamV2 {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        let this = &mut *self;
+        if this.finished {
+            return Poll::Ready(None);
+        }
+
+        let elapsed_compute = this.baseline_metrics.elapsed_compute();
+
+        loop {
+            let result = match ready!(this.input.poll_next_unpin(cx)) {
+                Some(Ok(batch)) => {
+                    let timer = elapsed_compute.timer();
+                    let result = group_aggregate_batch(
+                        &this.mode,
+                        &this.random_state,
+                        &this.group_expr,
+                        &mut this.accumulators,
+                        &this.group_schema,
+                        &this.aggr_layout,
+                        batch,
+                        &mut this.aggr_state,
+                        &this.aggregate_expressions,
+                    );
+
+                    timer.done();
+
+                    match result {
+                        Ok(_) => continue,
+                        Err(e) => Err(ArrowError::ExternalError(Box::new(e))),
+                    }
+                }
+                Some(Err(e)) => Err(e),
+                None => {
+                    this.finished = true;
+                    let timer = this.baseline_metrics.elapsed_compute().timer();
+                    let result = create_batch_from_map(
+                        &this.mode,
+                        &this.group_schema,
+                        &this.aggr_schema,
+                        &mut this.aggr_state,
+                        &mut this.accumulators,
+                        &this.schema,
+                    )
+                    .record_output(&this.baseline_metrics);
+
+                    timer.done();
+                    result
+                }
+            };
+
+            this.finished = true;
+            return Poll::Ready(Some(result));
+        }
+    }
+}
+
+impl RecordBatchStream for GroupedHashAggregateStreamV2 {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
+
+/// TODO: Make this a member function of [`GroupedHashAggregateStreamV2`]
+#[allow(clippy::too_many_arguments)]
+fn group_aggregate_batch(
+    mode: &AggregateMode,
+    random_state: &RandomState,
+    group_expr: &[Arc<dyn PhysicalExpr>],
+    accumulators: &mut [AccumulatorItemV2],
+    group_schema: &Schema,
+    state_layout: &RowLayout,
+    batch: RecordBatch,
+    aggr_state: &mut AggregationState,
+    aggregate_expressions: &[Vec<Arc<dyn PhysicalExpr>>],
+) -> Result<()> {
+    // evaluate the grouping expressions
+    let group_values = evaluate(group_expr, &batch)?;
+    let group_rows: Vec<Vec<u8>> = create_group_rows(group_values, group_schema);
+
+    // evaluate the aggregation expressions.
+    // We could evaluate them after the `take`, but since we need to evaluate all
+    // of them anyways, it is more performant to do it while they are together.
+    let aggr_input_values = evaluate_many(aggregate_expressions, &batch)?;
+
+    // 1.1 construct the key from the group values
+    // 1.2 construct the mapping key if it does not exist
+    // 1.3 add the row' index to `indices`
+
+    // track which entries in `aggr_state` have rows in this batch to aggregate
+    let mut groups_with_rows = vec![];
+
+    // 1.1 Calculate the group keys for the group values
+    let mut batch_hashes = vec![0; batch.num_rows()];
+    create_row_hashes(&group_rows, random_state, &mut batch_hashes)?;
+
+    for (row, hash) in batch_hashes.into_iter().enumerate() {
+        let AggregationState { map, group_states } = aggr_state;
+
+        let entry = map.get_mut(hash, |(_hash, group_idx)| {
+            // verify that a group that we are inserting with hash is
+            // actually the same key value as the group in
+            // existing_idx  (aka group_values @ row)
+            let group_state = &group_states[*group_idx];
+            group_rows[row] == group_state.group_by_values
+        });
+
+        match entry {
+            // Existing entry for this group value
+            Some((_hash, group_idx)) => {
+                let group_state = &mut group_states[*group_idx];
+                // 1.3
+                if group_state.indices.is_empty() {
+                    groups_with_rows.push(*group_idx);
+                };
+                group_state.indices.push(row as u32); // remember this row
+            }
+            //  1.2 Need to create new entry
+            None => {
+                // Add new entry to group_states and save newly created index
+                let group_state = RowGroupState {
+                    group_by_values: group_rows[row].clone(),
+                    aggregation_buffer: vec![0; state_layout.fixed_part_width()],
+                    indices: vec![row as u32], // 1.3
+                };
+                let group_idx = group_states.len();
+                group_states.push(group_state);
+                groups_with_rows.push(group_idx);
+
+                // for hasher function, use precomputed hash value
+                map.insert(hash, (hash, group_idx), |(hash, _group_idx)| *hash);
+            }
+        };
+    }
+
+    // Collect all indices + offsets based on keys in this vec
+    let mut batch_indices: UInt32Builder = UInt32Builder::new(0);
+    let mut offsets = vec![0];
+    let mut offset_so_far = 0;
+    for group_idx in groups_with_rows.iter() {
+        let indices = &aggr_state.group_states[*group_idx].indices;
+        batch_indices.append_slice(indices)?;
+        offset_so_far += indices.len();
+        offsets.push(offset_so_far);
+    }
+    let batch_indices = batch_indices.finish();
+
+    // `Take` all values based on indices into Arrays
+    let values: Vec<Vec<Arc<dyn Array>>> = aggr_input_values
+        .iter()
+        .map(|array| {
+            array
+                .iter()
+                .map(|array| {
+                    compute::take(
+                        array.as_ref(),
+                        &batch_indices,
+                        None, // None: no index check
+                    )
+                    .unwrap()
+                })
+                .collect()
+            // 2.3
+        })
+        .collect();
+
+    // 2.1 for each key in this batch
+    // 2.2 for each aggregation
+    // 2.3 `slice` from each of its arrays the keys' values
+    // 2.4 update / merge the accumulator with the values
+    // 2.5 clear indices
+    groups_with_rows
+        .iter()
+        .zip(offsets.windows(2))
+        .try_for_each(|(group_idx, offsets)| {
+            let group_state = &mut aggr_state.group_states[*group_idx];
+            // 2.2
+            accumulators
+                .iter_mut()
+                .zip(values.iter())
+                .map(|(accumulator, aggr_array)| {
+                    (
+                        accumulator,
+                        aggr_array
+                            .iter()
+                            .map(|array| {
+                                // 2.3
+                                array.slice(offsets[0], offsets[1] - offsets[0])
+                            })
+                            .collect::<Vec<ArrayRef>>(),
+                    )
+                })
+                .try_for_each(|(accumulator, values)| {
+                    let mut state_accessor =
+                        RowAccessor::new_from_layout(state_layout.clone());
+                    state_accessor
+                        .point_to(0, group_state.aggregation_buffer.as_mut_slice());
+                    match mode {
+                        AggregateMode::Partial => {
+                            accumulator.update_batch(&values, &mut state_accessor)
+                        }
+                        AggregateMode::FinalPartitioned | AggregateMode::Final => {
+                            // note: the aggregation here is over states, not values, thus the merge
+                            accumulator.merge_batch(&values, &mut state_accessor)
+                        }
+                    }
+                })
+                // 2.5
+                .and({
+                    group_state.indices.clear();
+                    Ok(())
+                })
+        })?;
+
+    Ok(())
+}
+
+/// The state that is built for each output group.
+#[derive(Debug)]
+struct RowGroupState {
+    /// The actual group by values, stored sequentially
+    group_by_values: Vec<u8>,
+
+    // Accumulator state, stored sequentially
+    aggregation_buffer: Vec<u8>,
+
+    /// scratch space used to collect indices for input rows in a
+    /// bach that have values to aggregate. Reset on each batch
+    indices: Vec<u32>,
+}
+
+/// The state of all the groups
+#[derive(Default)]
+struct AggregationState {
+    /// Logically maps group values to an index in `group_states`
+    ///
+    /// Uses the raw API of hashbrown to avoid actually storing the
+    /// keys in the table
+    ///
+    /// keys: u64 hashes of the GroupValue
+    /// values: (hash, index into `group_states`)
+    map: RawTable<(u64, usize)>,
+
+    /// State for each group
+    group_states: Vec<RowGroupState>,
+}
+
+impl std::fmt::Debug for AggregationState {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        // hashes are not store inline, so could only get values
+        let map_string = "RawTable";
+        f.debug_struct("RowAccumulators")

Review Comment:
   ```suggestion
           f.debug_struct("AggregationState")
   ```



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -0,0 +1,472 @@
+// Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   I found reviewing this code was easier after comparing with `hash.rs` as well:
   
   ```shell
   meld datafusion/core/src/physical_plan/aggregates/hash.rs datafusion/core/src/physical_plan/aggregates/row_hash.rs
   ```



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -0,0 +1,472 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Hash aggregation through row format
+
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::vec;
+
+use ahash::RandomState;
+use futures::{
+    ready,
+    stream::{Stream, StreamExt},
+};
+
+use crate::error::Result;
+use crate::physical_plan::aggregates::{
+    evaluate, evaluate_many, group_schema, AccumulatorItemV2, AggregateMode,
+};
+use crate::physical_plan::hash_utils::create_row_hashes;
+use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
+use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
+use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
+
+use arrow::compute::cast;
+use arrow::datatypes::Schema;
+use arrow::{array::ArrayRef, compute};
+use arrow::{
+    array::{Array, UInt32Builder},
+    error::{ArrowError, Result as ArrowResult},
+};
+use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
+use datafusion_common::ScalarValue;
+use datafusion_row::accessor::RowAccessor;
+use datafusion_row::layout::RowLayout;
+use datafusion_row::reader::{read_row, RowReader};
+use datafusion_row::writer::{write_row, RowWriter};
+use datafusion_row::{MutableRecordBatch, RowType};
+use hashbrown::raw::RawTable;
+
+/// Grouping aggregate with row format to store the aggregation state.
+///
+/// The Architecture is similar to that in [`super::GroupedHashAggregateStream`] but use
+/// row format inside the HashTable to store aggregation buffers.
+pub(crate) struct GroupedHashAggregateStreamV2 {
+    schema: SchemaRef,
+    input: SendableRecordBatchStream,
+    mode: AggregateMode,
+    aggr_state: AggregationState,
+    aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
+
+    group_expr: Vec<Arc<dyn PhysicalExpr>>,
+    accumulators: Vec<AccumulatorItemV2>,
+
+    group_schema: SchemaRef,
+    aggr_schema: SchemaRef,
+    aggr_layout: RowLayout,
+
+    baseline_metrics: BaselineMetrics,
+    random_state: RandomState,
+    finished: bool,
+}
+
+fn aggr_state_schema(aggr_expr: &[Arc<dyn AggregateExpr>]) -> Result<SchemaRef> {
+    let fields = aggr_expr
+        .iter()
+        .flat_map(|expr| expr.state_fields().unwrap().into_iter())
+        .collect::<Vec<_>>();
+    Ok(Arc::new(Schema::new(fields)))
+}
+
+impl GroupedHashAggregateStreamV2 {
+    /// Create a new GroupedRowHashAggregateStream
+    pub fn new(
+        mode: AggregateMode,
+        schema: SchemaRef,
+        group_expr: Vec<Arc<dyn PhysicalExpr>>,
+        aggr_expr: Vec<Arc<dyn AggregateExpr>>,
+        input: SendableRecordBatchStream,
+        baseline_metrics: BaselineMetrics,
+    ) -> Result<Self> {
+        let timer = baseline_metrics.elapsed_compute().timer();
+
+        // The expressions to evaluate the batch, one vec of expressions per aggregation.
+        // Assume create_schema() always put group columns in front of aggr columns, we set
+        // col_idx_base to group expression count.
+        let aggregate_expressions =
+            aggregates::aggregate_expressions(&aggr_expr, &mode, group_expr.len())?;
+
+        let accumulators = aggregates::create_accumulators_v2(&aggr_expr)?;
+
+        let group_schema = group_schema(&schema, group_expr.len());
+        let aggr_schema = aggr_state_schema(&aggr_expr)?;
+
+        let aggr_layout = RowLayout::new(&aggr_schema, RowType::WordAligned);
+        timer.done();
+
+        Ok(Self {
+            schema,
+            mode,
+            input,
+            group_expr,
+            accumulators,
+            group_schema,
+            aggr_schema,
+            aggr_layout,
+            baseline_metrics,
+            aggregate_expressions,
+            aggr_state: Default::default(),
+            random_state: Default::default(),
+            finished: false,
+        })
+    }
+}
+
+impl Stream for GroupedHashAggregateStreamV2 {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        let this = &mut *self;
+        if this.finished {
+            return Poll::Ready(None);
+        }
+
+        let elapsed_compute = this.baseline_metrics.elapsed_compute();
+
+        loop {
+            let result = match ready!(this.input.poll_next_unpin(cx)) {
+                Some(Ok(batch)) => {
+                    let timer = elapsed_compute.timer();
+                    let result = group_aggregate_batch(
+                        &this.mode,
+                        &this.random_state,
+                        &this.group_expr,
+                        &mut this.accumulators,
+                        &this.group_schema,
+                        &this.aggr_layout,
+                        batch,
+                        &mut this.aggr_state,
+                        &this.aggregate_expressions,
+                    );
+
+                    timer.done();
+
+                    match result {
+                        Ok(_) => continue,
+                        Err(e) => Err(ArrowError::ExternalError(Box::new(e))),
+                    }
+                }
+                Some(Err(e)) => Err(e),
+                None => {
+                    this.finished = true;
+                    let timer = this.baseline_metrics.elapsed_compute().timer();
+                    let result = create_batch_from_map(
+                        &this.mode,
+                        &this.group_schema,
+                        &this.aggr_schema,
+                        &mut this.aggr_state,
+                        &mut this.accumulators,
+                        &this.schema,
+                    )
+                    .record_output(&this.baseline_metrics);
+
+                    timer.done();
+                    result
+                }
+            };
+
+            this.finished = true;
+            return Poll::Ready(Some(result));
+        }
+    }
+}
+
+impl RecordBatchStream for GroupedHashAggregateStreamV2 {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
+
+/// TODO: Make this a member function of [`GroupedHashAggregateStreamV2`]
+#[allow(clippy::too_many_arguments)]
+fn group_aggregate_batch(
+    mode: &AggregateMode,
+    random_state: &RandomState,
+    group_expr: &[Arc<dyn PhysicalExpr>],
+    accumulators: &mut [AccumulatorItemV2],
+    group_schema: &Schema,
+    state_layout: &RowLayout,
+    batch: RecordBatch,
+    aggr_state: &mut AggregationState,
+    aggregate_expressions: &[Vec<Arc<dyn PhysicalExpr>>],
+) -> Result<()> {
+    // evaluate the grouping expressions
+    let group_values = evaluate(group_expr, &batch)?;
+    let group_rows: Vec<Vec<u8>> = create_group_rows(group_values, group_schema);
+
+    // evaluate the aggregation expressions.
+    // We could evaluate them after the `take`, but since we need to evaluate all
+    // of them anyways, it is more performant to do it while they are together.
+    let aggr_input_values = evaluate_many(aggregate_expressions, &batch)?;
+
+    // 1.1 construct the key from the group values
+    // 1.2 construct the mapping key if it does not exist
+    // 1.3 add the row' index to `indices`
+
+    // track which entries in `aggr_state` have rows in this batch to aggregate
+    let mut groups_with_rows = vec![];
+
+    // 1.1 Calculate the group keys for the group values
+    let mut batch_hashes = vec![0; batch.num_rows()];
+    create_row_hashes(&group_rows, random_state, &mut batch_hashes)?;
+
+    for (row, hash) in batch_hashes.into_iter().enumerate() {
+        let AggregationState { map, group_states } = aggr_state;
+
+        let entry = map.get_mut(hash, |(_hash, group_idx)| {
+            // verify that a group that we are inserting with hash is
+            // actually the same key value as the group in
+            // existing_idx  (aka group_values @ row)
+            let group_state = &group_states[*group_idx];
+            group_rows[row] == group_state.group_by_values
+        });
+
+        match entry {
+            // Existing entry for this group value
+            Some((_hash, group_idx)) => {
+                let group_state = &mut group_states[*group_idx];
+                // 1.3
+                if group_state.indices.is_empty() {
+                    groups_with_rows.push(*group_idx);
+                };
+                group_state.indices.push(row as u32); // remember this row
+            }
+            //  1.2 Need to create new entry
+            None => {
+                // Add new entry to group_states and save newly created index
+                let group_state = RowGroupState {
+                    group_by_values: group_rows[row].clone(),
+                    aggregation_buffer: vec![0; state_layout.fixed_part_width()],
+                    indices: vec![row as u32], // 1.3
+                };
+                let group_idx = group_states.len();
+                group_states.push(group_state);
+                groups_with_rows.push(group_idx);
+
+                // for hasher function, use precomputed hash value
+                map.insert(hash, (hash, group_idx), |(hash, _group_idx)| *hash);
+            }
+        };
+    }
+
+    // Collect all indices + offsets based on keys in this vec
+    let mut batch_indices: UInt32Builder = UInt32Builder::new(0);
+    let mut offsets = vec![0];
+    let mut offset_so_far = 0;
+    for group_idx in groups_with_rows.iter() {
+        let indices = &aggr_state.group_states[*group_idx].indices;
+        batch_indices.append_slice(indices)?;
+        offset_so_far += indices.len();
+        offsets.push(offset_so_far);
+    }
+    let batch_indices = batch_indices.finish();
+
+    // `Take` all values based on indices into Arrays
+    let values: Vec<Vec<Arc<dyn Array>>> = aggr_input_values
+        .iter()
+        .map(|array| {
+            array
+                .iter()
+                .map(|array| {
+                    compute::take(
+                        array.as_ref(),
+                        &batch_indices,
+                        None, // None: no index check
+                    )
+                    .unwrap()
+                })
+                .collect()
+            // 2.3
+        })
+        .collect();
+
+    // 2.1 for each key in this batch
+    // 2.2 for each aggregation
+    // 2.3 `slice` from each of its arrays the keys' values
+    // 2.4 update / merge the accumulator with the values
+    // 2.5 clear indices
+    groups_with_rows
+        .iter()
+        .zip(offsets.windows(2))
+        .try_for_each(|(group_idx, offsets)| {
+            let group_state = &mut aggr_state.group_states[*group_idx];
+            // 2.2
+            accumulators
+                .iter_mut()
+                .zip(values.iter())
+                .map(|(accumulator, aggr_array)| {
+                    (
+                        accumulator,
+                        aggr_array
+                            .iter()
+                            .map(|array| {
+                                // 2.3
+                                array.slice(offsets[0], offsets[1] - offsets[0])
+                            })
+                            .collect::<Vec<ArrayRef>>(),
+                    )
+                })
+                .try_for_each(|(accumulator, values)| {
+                    let mut state_accessor =

Review Comment:
   Depending on how often this is called (as the state_layout has `Vec`s, etc), it may make sense to keep the RowAccessor in `group_state` or else reduce the cost of sharing `StateLayout`)



##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -142,6 +147,12 @@ impl AggregateExec {
     pub fn input_schema(&self) -> SchemaRef {
         self.input_schema.clone()
     }
+
+    fn row_aggregate_supported(&self) -> bool {
+        let group_schema = group_schema(&self.schema, self.group_expr.len());
+        row_supported(&group_schema, RowType::Compact)

Review Comment:
   is it a problem that this is checking `RowType::Compact` but the code in `GroupedHashAggregateStreamV2` is using `RowType::WordAligned`:
   
   ```rust
           let aggr_layout = RowLayout::new(&aggr_schema, RowType::WordAligned);
   ```
   



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -0,0 +1,472 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Hash aggregation through row format
+
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::vec;
+
+use ahash::RandomState;
+use futures::{
+    ready,
+    stream::{Stream, StreamExt},
+};
+
+use crate::error::Result;
+use crate::physical_plan::aggregates::{
+    evaluate, evaluate_many, group_schema, AccumulatorItemV2, AggregateMode,
+};
+use crate::physical_plan::hash_utils::create_row_hashes;
+use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
+use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
+use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
+
+use arrow::compute::cast;
+use arrow::datatypes::Schema;
+use arrow::{array::ArrayRef, compute};
+use arrow::{
+    array::{Array, UInt32Builder},
+    error::{ArrowError, Result as ArrowResult},
+};
+use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
+use datafusion_common::ScalarValue;
+use datafusion_row::accessor::RowAccessor;
+use datafusion_row::layout::RowLayout;
+use datafusion_row::reader::{read_row, RowReader};
+use datafusion_row::writer::{write_row, RowWriter};
+use datafusion_row::{MutableRecordBatch, RowType};
+use hashbrown::raw::RawTable;
+
+/// Grouping aggregate with row format to store the aggregation state.
+///
+/// The Architecture is similar to that in [`super::GroupedHashAggregateStream`] but use
+/// row format inside the HashTable to store aggregation buffers.
+pub(crate) struct GroupedHashAggregateStreamV2 {
+    schema: SchemaRef,
+    input: SendableRecordBatchStream,
+    mode: AggregateMode,
+    aggr_state: AggregationState,
+    aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
+
+    group_expr: Vec<Arc<dyn PhysicalExpr>>,
+    accumulators: Vec<AccumulatorItemV2>,
+
+    group_schema: SchemaRef,
+    aggr_schema: SchemaRef,
+    aggr_layout: RowLayout,
+
+    baseline_metrics: BaselineMetrics,
+    random_state: RandomState,
+    finished: bool,
+}
+
+fn aggr_state_schema(aggr_expr: &[Arc<dyn AggregateExpr>]) -> Result<SchemaRef> {
+    let fields = aggr_expr
+        .iter()
+        .flat_map(|expr| expr.state_fields().unwrap().into_iter())
+        .collect::<Vec<_>>();
+    Ok(Arc::new(Schema::new(fields)))
+}
+
+impl GroupedHashAggregateStreamV2 {
+    /// Create a new GroupedRowHashAggregateStream
+    pub fn new(
+        mode: AggregateMode,
+        schema: SchemaRef,
+        group_expr: Vec<Arc<dyn PhysicalExpr>>,
+        aggr_expr: Vec<Arc<dyn AggregateExpr>>,
+        input: SendableRecordBatchStream,
+        baseline_metrics: BaselineMetrics,
+    ) -> Result<Self> {
+        let timer = baseline_metrics.elapsed_compute().timer();
+
+        // The expressions to evaluate the batch, one vec of expressions per aggregation.
+        // Assume create_schema() always put group columns in front of aggr columns, we set
+        // col_idx_base to group expression count.
+        let aggregate_expressions =
+            aggregates::aggregate_expressions(&aggr_expr, &mode, group_expr.len())?;
+
+        let accumulators = aggregates::create_accumulators_v2(&aggr_expr)?;
+
+        let group_schema = group_schema(&schema, group_expr.len());
+        let aggr_schema = aggr_state_schema(&aggr_expr)?;
+
+        let aggr_layout = RowLayout::new(&aggr_schema, RowType::WordAligned);
+        timer.done();
+
+        Ok(Self {
+            schema,
+            mode,
+            input,
+            group_expr,
+            accumulators,
+            group_schema,
+            aggr_schema,
+            aggr_layout,
+            baseline_metrics,
+            aggregate_expressions,
+            aggr_state: Default::default(),
+            random_state: Default::default(),
+            finished: false,
+        })
+    }
+}
+
+impl Stream for GroupedHashAggregateStreamV2 {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        let this = &mut *self;
+        if this.finished {
+            return Poll::Ready(None);
+        }
+
+        let elapsed_compute = this.baseline_metrics.elapsed_compute();
+
+        loop {
+            let result = match ready!(this.input.poll_next_unpin(cx)) {
+                Some(Ok(batch)) => {
+                    let timer = elapsed_compute.timer();
+                    let result = group_aggregate_batch(
+                        &this.mode,
+                        &this.random_state,
+                        &this.group_expr,
+                        &mut this.accumulators,
+                        &this.group_schema,
+                        &this.aggr_layout,
+                        batch,
+                        &mut this.aggr_state,
+                        &this.aggregate_expressions,
+                    );
+
+                    timer.done();
+
+                    match result {
+                        Ok(_) => continue,
+                        Err(e) => Err(ArrowError::ExternalError(Box::new(e))),
+                    }
+                }
+                Some(Err(e)) => Err(e),
+                None => {
+                    this.finished = true;
+                    let timer = this.baseline_metrics.elapsed_compute().timer();
+                    let result = create_batch_from_map(
+                        &this.mode,
+                        &this.group_schema,
+                        &this.aggr_schema,
+                        &mut this.aggr_state,
+                        &mut this.accumulators,
+                        &this.schema,
+                    )
+                    .record_output(&this.baseline_metrics);
+
+                    timer.done();
+                    result
+                }
+            };
+
+            this.finished = true;
+            return Poll::Ready(Some(result));
+        }
+    }
+}
+
+impl RecordBatchStream for GroupedHashAggregateStreamV2 {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
+
+/// TODO: Make this a member function of [`GroupedHashAggregateStreamV2`]
+#[allow(clippy::too_many_arguments)]
+fn group_aggregate_batch(
+    mode: &AggregateMode,
+    random_state: &RandomState,
+    group_expr: &[Arc<dyn PhysicalExpr>],
+    accumulators: &mut [AccumulatorItemV2],
+    group_schema: &Schema,
+    state_layout: &RowLayout,
+    batch: RecordBatch,
+    aggr_state: &mut AggregationState,
+    aggregate_expressions: &[Vec<Arc<dyn PhysicalExpr>>],
+) -> Result<()> {
+    // evaluate the grouping expressions
+    let group_values = evaluate(group_expr, &batch)?;
+    let group_rows: Vec<Vec<u8>> = create_group_rows(group_values, group_schema);
+
+    // evaluate the aggregation expressions.
+    // We could evaluate them after the `take`, but since we need to evaluate all
+    // of them anyways, it is more performant to do it while they are together.
+    let aggr_input_values = evaluate_many(aggregate_expressions, &batch)?;
+
+    // 1.1 construct the key from the group values
+    // 1.2 construct the mapping key if it does not exist
+    // 1.3 add the row' index to `indices`
+
+    // track which entries in `aggr_state` have rows in this batch to aggregate
+    let mut groups_with_rows = vec![];
+
+    // 1.1 Calculate the group keys for the group values
+    let mut batch_hashes = vec![0; batch.num_rows()];
+    create_row_hashes(&group_rows, random_state, &mut batch_hashes)?;
+
+    for (row, hash) in batch_hashes.into_iter().enumerate() {
+        let AggregationState { map, group_states } = aggr_state;
+
+        let entry = map.get_mut(hash, |(_hash, group_idx)| {
+            // verify that a group that we are inserting with hash is
+            // actually the same key value as the group in
+            // existing_idx  (aka group_values @ row)
+            let group_state = &group_states[*group_idx];
+            group_rows[row] == group_state.group_by_values
+        });
+
+        match entry {
+            // Existing entry for this group value
+            Some((_hash, group_idx)) => {
+                let group_state = &mut group_states[*group_idx];
+                // 1.3
+                if group_state.indices.is_empty() {
+                    groups_with_rows.push(*group_idx);
+                };
+                group_state.indices.push(row as u32); // remember this row
+            }
+            //  1.2 Need to create new entry
+            None => {
+                // Add new entry to group_states and save newly created index
+                let group_state = RowGroupState {
+                    group_by_values: group_rows[row].clone(),
+                    aggregation_buffer: vec![0; state_layout.fixed_part_width()],
+                    indices: vec![row as u32], // 1.3
+                };
+                let group_idx = group_states.len();
+                group_states.push(group_state);
+                groups_with_rows.push(group_idx);
+
+                // for hasher function, use precomputed hash value
+                map.insert(hash, (hash, group_idx), |(hash, _group_idx)| *hash);
+            }
+        };
+    }
+
+    // Collect all indices + offsets based on keys in this vec
+    let mut batch_indices: UInt32Builder = UInt32Builder::new(0);
+    let mut offsets = vec![0];
+    let mut offset_so_far = 0;
+    for group_idx in groups_with_rows.iter() {
+        let indices = &aggr_state.group_states[*group_idx].indices;
+        batch_indices.append_slice(indices)?;
+        offset_so_far += indices.len();
+        offsets.push(offset_so_far);
+    }
+    let batch_indices = batch_indices.finish();
+
+    // `Take` all values based on indices into Arrays
+    let values: Vec<Vec<Arc<dyn Array>>> = aggr_input_values
+        .iter()
+        .map(|array| {
+            array
+                .iter()
+                .map(|array| {
+                    compute::take(
+                        array.as_ref(),
+                        &batch_indices,
+                        None, // None: no index check
+                    )
+                    .unwrap()
+                })
+                .collect()
+            // 2.3
+        })
+        .collect();
+
+    // 2.1 for each key in this batch
+    // 2.2 for each aggregation
+    // 2.3 `slice` from each of its arrays the keys' values
+    // 2.4 update / merge the accumulator with the values
+    // 2.5 clear indices
+    groups_with_rows
+        .iter()
+        .zip(offsets.windows(2))
+        .try_for_each(|(group_idx, offsets)| {
+            let group_state = &mut aggr_state.group_states[*group_idx];
+            // 2.2
+            accumulators
+                .iter_mut()
+                .zip(values.iter())
+                .map(|(accumulator, aggr_array)| {
+                    (
+                        accumulator,
+                        aggr_array
+                            .iter()
+                            .map(|array| {
+                                // 2.3
+                                array.slice(offsets[0], offsets[1] - offsets[0])
+                            })
+                            .collect::<Vec<ArrayRef>>(),
+                    )
+                })
+                .try_for_each(|(accumulator, values)| {
+                    let mut state_accessor =
+                        RowAccessor::new_from_layout(state_layout.clone());
+                    state_accessor
+                        .point_to(0, group_state.aggregation_buffer.as_mut_slice());
+                    match mode {
+                        AggregateMode::Partial => {
+                            accumulator.update_batch(&values, &mut state_accessor)
+                        }
+                        AggregateMode::FinalPartitioned | AggregateMode::Final => {
+                            // note: the aggregation here is over states, not values, thus the merge
+                            accumulator.merge_batch(&values, &mut state_accessor)
+                        }
+                    }
+                })
+                // 2.5
+                .and({
+                    group_state.indices.clear();
+                    Ok(())
+                })
+        })?;
+
+    Ok(())
+}
+
+/// The state that is built for each output group.
+#[derive(Debug)]
+struct RowGroupState {

Review Comment:
   ❤️ 



##########
datafusion/row/src/accessor.rs:
##########
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Setter/Getter for row with all fixed-sized fields.
+
+use crate::layout::{RowLayout, RowType};
+use crate::validity::NullBitsFormatter;
+use crate::{fn_get_idx, fn_get_idx_opt, fn_set_idx};
+use arrow::datatypes::{DataType, Schema};
+use arrow::util::bit_util::{get_bit_raw, set_bit_raw};
+use datafusion_common::ScalarValue;
+
+//TODO: DRY with reader and writer
+
+/// Read the tuple `data[base_offset..]` we are currently pointing to
+pub struct RowAccessor<'a> {
+    /// Layout on how to read each field
+    layout: RowLayout,

Review Comment:
   The layout is non trivial (it has a `Vec`) -- perhaps it is worth also making this a `layout: &RowLayout` or `layout: Arc<RowLayout>`



##########
datafusion/physical-expr/src/aggregate/sum.rs:
##########
@@ -338,6 +455,42 @@ impl Accumulator for SumAccumulator {
     }
 }
 
+#[derive(Debug)]
+struct SumAccumulatorV2 {
+    index: usize,
+    datatype: DataType,
+}
+
+impl SumAccumulatorV2 {
+    pub fn new(index: usize, datatype: DataType) -> Self {
+        Self { index, datatype }
+    }
+}
+
+impl AccumulatorV2 for SumAccumulatorV2 {
+    fn update_batch(
+        &mut self,
+        values: &[ArrayRef],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        let values = &values[0];
+        add_to_row(&self.datatype, self.index, accessor, &sum_batch(values)?)?;

Review Comment:
   I wonder if it is needed to go through `sum_batch` here (which turns the sum into a `ScalarValue`) -- perhaps we could call the appropriate sum kernel followed by  a direct update



##########
datafusion/row/src/accessor.rs:
##########
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Setter/Getter for row with all fixed-sized fields.
+
+use crate::layout::{RowLayout, RowType};
+use crate::validity::NullBitsFormatter;
+use crate::{fn_get_idx, fn_get_idx_opt, fn_set_idx};
+use arrow::datatypes::{DataType, Schema};
+use arrow::util::bit_util::{get_bit_raw, set_bit_raw};
+use datafusion_common::ScalarValue;
+
+//TODO: DRY with reader and writer

Review Comment:
   yeah, maybe consolidating with the writer would be cool



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -0,0 +1,472 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Hash aggregation through row format
+

Review Comment:
   It might be helpful to add the comments about the architecture from `hash.rs` 
   
   ```
   /*
   The architecture is the following:
   ```



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -0,0 +1,472 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Hash aggregation through row format
+
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::vec;
+
+use ahash::RandomState;
+use futures::{
+    ready,
+    stream::{Stream, StreamExt},
+};
+
+use crate::error::Result;
+use crate::physical_plan::aggregates::{
+    evaluate, evaluate_many, group_schema, AccumulatorItemV2, AggregateMode,
+};
+use crate::physical_plan::hash_utils::create_row_hashes;
+use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
+use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
+use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
+
+use arrow::compute::cast;
+use arrow::datatypes::Schema;
+use arrow::{array::ArrayRef, compute};
+use arrow::{
+    array::{Array, UInt32Builder},
+    error::{ArrowError, Result as ArrowResult},
+};
+use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
+use datafusion_common::ScalarValue;
+use datafusion_row::accessor::RowAccessor;
+use datafusion_row::layout::RowLayout;
+use datafusion_row::reader::{read_row, RowReader};
+use datafusion_row::writer::{write_row, RowWriter};
+use datafusion_row::{MutableRecordBatch, RowType};
+use hashbrown::raw::RawTable;
+
+/// Grouping aggregate with row format to store the aggregation state.
+///
+/// The Architecture is similar to that in [`super::GroupedHashAggregateStream`] but use
+/// row format inside the HashTable to store aggregation buffers.
+pub(crate) struct GroupedHashAggregateStreamV2 {
+    schema: SchemaRef,
+    input: SendableRecordBatchStream,
+    mode: AggregateMode,
+    aggr_state: AggregationState,
+    aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
+
+    group_expr: Vec<Arc<dyn PhysicalExpr>>,
+    accumulators: Vec<AccumulatorItemV2>,
+
+    group_schema: SchemaRef,
+    aggr_schema: SchemaRef,
+    aggr_layout: RowLayout,
+
+    baseline_metrics: BaselineMetrics,
+    random_state: RandomState,
+    finished: bool,
+}
+
+fn aggr_state_schema(aggr_expr: &[Arc<dyn AggregateExpr>]) -> Result<SchemaRef> {
+    let fields = aggr_expr
+        .iter()
+        .flat_map(|expr| expr.state_fields().unwrap().into_iter())
+        .collect::<Vec<_>>();
+    Ok(Arc::new(Schema::new(fields)))
+}
+
+impl GroupedHashAggregateStreamV2 {
+    /// Create a new GroupedRowHashAggregateStream
+    pub fn new(
+        mode: AggregateMode,
+        schema: SchemaRef,
+        group_expr: Vec<Arc<dyn PhysicalExpr>>,
+        aggr_expr: Vec<Arc<dyn AggregateExpr>>,
+        input: SendableRecordBatchStream,
+        baseline_metrics: BaselineMetrics,
+    ) -> Result<Self> {
+        let timer = baseline_metrics.elapsed_compute().timer();
+
+        // The expressions to evaluate the batch, one vec of expressions per aggregation.
+        // Assume create_schema() always put group columns in front of aggr columns, we set
+        // col_idx_base to group expression count.
+        let aggregate_expressions =
+            aggregates::aggregate_expressions(&aggr_expr, &mode, group_expr.len())?;
+
+        let accumulators = aggregates::create_accumulators_v2(&aggr_expr)?;
+
+        let group_schema = group_schema(&schema, group_expr.len());
+        let aggr_schema = aggr_state_schema(&aggr_expr)?;
+
+        let aggr_layout = RowLayout::new(&aggr_schema, RowType::WordAligned);
+        timer.done();
+
+        Ok(Self {
+            schema,
+            mode,
+            input,
+            group_expr,
+            accumulators,
+            group_schema,
+            aggr_schema,
+            aggr_layout,
+            baseline_metrics,
+            aggregate_expressions,
+            aggr_state: Default::default(),
+            random_state: Default::default(),
+            finished: false,
+        })
+    }
+}
+
+impl Stream for GroupedHashAggregateStreamV2 {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        let this = &mut *self;
+        if this.finished {
+            return Poll::Ready(None);
+        }
+
+        let elapsed_compute = this.baseline_metrics.elapsed_compute();
+
+        loop {
+            let result = match ready!(this.input.poll_next_unpin(cx)) {
+                Some(Ok(batch)) => {
+                    let timer = elapsed_compute.timer();
+                    let result = group_aggregate_batch(
+                        &this.mode,
+                        &this.random_state,
+                        &this.group_expr,
+                        &mut this.accumulators,
+                        &this.group_schema,
+                        &this.aggr_layout,
+                        batch,
+                        &mut this.aggr_state,
+                        &this.aggregate_expressions,
+                    );
+
+                    timer.done();
+
+                    match result {
+                        Ok(_) => continue,
+                        Err(e) => Err(ArrowError::ExternalError(Box::new(e))),
+                    }
+                }
+                Some(Err(e)) => Err(e),
+                None => {
+                    this.finished = true;
+                    let timer = this.baseline_metrics.elapsed_compute().timer();
+                    let result = create_batch_from_map(
+                        &this.mode,
+                        &this.group_schema,
+                        &this.aggr_schema,
+                        &mut this.aggr_state,
+                        &mut this.accumulators,
+                        &this.schema,
+                    )
+                    .record_output(&this.baseline_metrics);
+
+                    timer.done();
+                    result
+                }
+            };
+
+            this.finished = true;
+            return Poll::Ready(Some(result));
+        }
+    }
+}
+
+impl RecordBatchStream for GroupedHashAggregateStreamV2 {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
+
+/// TODO: Make this a member function of [`GroupedHashAggregateStreamV2`]
+#[allow(clippy::too_many_arguments)]
+fn group_aggregate_batch(
+    mode: &AggregateMode,
+    random_state: &RandomState,
+    group_expr: &[Arc<dyn PhysicalExpr>],
+    accumulators: &mut [AccumulatorItemV2],
+    group_schema: &Schema,
+    state_layout: &RowLayout,
+    batch: RecordBatch,
+    aggr_state: &mut AggregationState,
+    aggregate_expressions: &[Vec<Arc<dyn PhysicalExpr>>],
+) -> Result<()> {
+    // evaluate the grouping expressions
+    let group_values = evaluate(group_expr, &batch)?;
+    let group_rows: Vec<Vec<u8>> = create_group_rows(group_values, group_schema);

Review Comment:
   This is cool that the group keys are also computed using row format



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -0,0 +1,472 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Hash aggregation through row format
+
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::vec;
+
+use ahash::RandomState;
+use futures::{
+    ready,
+    stream::{Stream, StreamExt},
+};
+
+use crate::error::Result;
+use crate::physical_plan::aggregates::{
+    evaluate, evaluate_many, group_schema, AccumulatorItemV2, AggregateMode,
+};
+use crate::physical_plan::hash_utils::create_row_hashes;
+use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
+use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
+use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
+
+use arrow::compute::cast;
+use arrow::datatypes::Schema;
+use arrow::{array::ArrayRef, compute};
+use arrow::{
+    array::{Array, UInt32Builder},
+    error::{ArrowError, Result as ArrowResult},
+};
+use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
+use datafusion_common::ScalarValue;
+use datafusion_row::accessor::RowAccessor;
+use datafusion_row::layout::RowLayout;
+use datafusion_row::reader::{read_row, RowReader};
+use datafusion_row::writer::{write_row, RowWriter};
+use datafusion_row::{MutableRecordBatch, RowType};
+use hashbrown::raw::RawTable;
+
+/// Grouping aggregate with row format to store the aggregation state.
+///
+/// The Architecture is similar to that in [`super::GroupedHashAggregateStream`] but use
+/// row format inside the HashTable to store aggregation buffers.
+pub(crate) struct GroupedHashAggregateStreamV2 {
+    schema: SchemaRef,
+    input: SendableRecordBatchStream,
+    mode: AggregateMode,
+    aggr_state: AggregationState,
+    aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
+
+    group_expr: Vec<Arc<dyn PhysicalExpr>>,
+    accumulators: Vec<AccumulatorItemV2>,
+
+    group_schema: SchemaRef,
+    aggr_schema: SchemaRef,
+    aggr_layout: RowLayout,
+
+    baseline_metrics: BaselineMetrics,
+    random_state: RandomState,
+    finished: bool,
+}
+
+fn aggr_state_schema(aggr_expr: &[Arc<dyn AggregateExpr>]) -> Result<SchemaRef> {
+    let fields = aggr_expr
+        .iter()
+        .flat_map(|expr| expr.state_fields().unwrap().into_iter())
+        .collect::<Vec<_>>();
+    Ok(Arc::new(Schema::new(fields)))
+}
+
+impl GroupedHashAggregateStreamV2 {
+    /// Create a new GroupedRowHashAggregateStream
+    pub fn new(
+        mode: AggregateMode,
+        schema: SchemaRef,
+        group_expr: Vec<Arc<dyn PhysicalExpr>>,
+        aggr_expr: Vec<Arc<dyn AggregateExpr>>,
+        input: SendableRecordBatchStream,
+        baseline_metrics: BaselineMetrics,
+    ) -> Result<Self> {
+        let timer = baseline_metrics.elapsed_compute().timer();
+
+        // The expressions to evaluate the batch, one vec of expressions per aggregation.
+        // Assume create_schema() always put group columns in front of aggr columns, we set
+        // col_idx_base to group expression count.
+        let aggregate_expressions =
+            aggregates::aggregate_expressions(&aggr_expr, &mode, group_expr.len())?;
+
+        let accumulators = aggregates::create_accumulators_v2(&aggr_expr)?;
+
+        let group_schema = group_schema(&schema, group_expr.len());
+        let aggr_schema = aggr_state_schema(&aggr_expr)?;
+
+        let aggr_layout = RowLayout::new(&aggr_schema, RowType::WordAligned);
+        timer.done();
+
+        Ok(Self {
+            schema,
+            mode,
+            input,
+            group_expr,
+            accumulators,
+            group_schema,
+            aggr_schema,
+            aggr_layout,
+            baseline_metrics,
+            aggregate_expressions,
+            aggr_state: Default::default(),
+            random_state: Default::default(),
+            finished: false,
+        })
+    }
+}
+
+impl Stream for GroupedHashAggregateStreamV2 {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        let this = &mut *self;
+        if this.finished {
+            return Poll::Ready(None);
+        }
+
+        let elapsed_compute = this.baseline_metrics.elapsed_compute();
+
+        loop {
+            let result = match ready!(this.input.poll_next_unpin(cx)) {
+                Some(Ok(batch)) => {
+                    let timer = elapsed_compute.timer();
+                    let result = group_aggregate_batch(
+                        &this.mode,
+                        &this.random_state,
+                        &this.group_expr,
+                        &mut this.accumulators,
+                        &this.group_schema,
+                        &this.aggr_layout,
+                        batch,
+                        &mut this.aggr_state,
+                        &this.aggregate_expressions,
+                    );
+
+                    timer.done();
+
+                    match result {
+                        Ok(_) => continue,
+                        Err(e) => Err(ArrowError::ExternalError(Box::new(e))),
+                    }
+                }
+                Some(Err(e)) => Err(e),
+                None => {
+                    this.finished = true;
+                    let timer = this.baseline_metrics.elapsed_compute().timer();
+                    let result = create_batch_from_map(
+                        &this.mode,
+                        &this.group_schema,
+                        &this.aggr_schema,
+                        &mut this.aggr_state,
+                        &mut this.accumulators,
+                        &this.schema,
+                    )
+                    .record_output(&this.baseline_metrics);
+
+                    timer.done();
+                    result
+                }
+            };
+
+            this.finished = true;
+            return Poll::Ready(Some(result));
+        }
+    }
+}
+
+impl RecordBatchStream for GroupedHashAggregateStreamV2 {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
+
+/// TODO: Make this a member function of [`GroupedHashAggregateStreamV2`]
+#[allow(clippy::too_many_arguments)]
+fn group_aggregate_batch(
+    mode: &AggregateMode,
+    random_state: &RandomState,
+    group_expr: &[Arc<dyn PhysicalExpr>],
+    accumulators: &mut [AccumulatorItemV2],
+    group_schema: &Schema,
+    state_layout: &RowLayout,
+    batch: RecordBatch,
+    aggr_state: &mut AggregationState,
+    aggregate_expressions: &[Vec<Arc<dyn PhysicalExpr>>],
+) -> Result<()> {
+    // evaluate the grouping expressions
+    let group_values = evaluate(group_expr, &batch)?;
+    let group_rows: Vec<Vec<u8>> = create_group_rows(group_values, group_schema);
+
+    // evaluate the aggregation expressions.
+    // We could evaluate them after the `take`, but since we need to evaluate all
+    // of them anyways, it is more performant to do it while they are together.
+    let aggr_input_values = evaluate_many(aggregate_expressions, &batch)?;
+
+    // 1.1 construct the key from the group values
+    // 1.2 construct the mapping key if it does not exist
+    // 1.3 add the row' index to `indices`
+
+    // track which entries in `aggr_state` have rows in this batch to aggregate
+    let mut groups_with_rows = vec![];
+
+    // 1.1 Calculate the group keys for the group values
+    let mut batch_hashes = vec![0; batch.num_rows()];
+    create_row_hashes(&group_rows, random_state, &mut batch_hashes)?;
+
+    for (row, hash) in batch_hashes.into_iter().enumerate() {
+        let AggregationState { map, group_states } = aggr_state;
+
+        let entry = map.get_mut(hash, |(_hash, group_idx)| {
+            // verify that a group that we are inserting with hash is
+            // actually the same key value as the group in
+            // existing_idx  (aka group_values @ row)
+            let group_state = &group_states[*group_idx];
+            group_rows[row] == group_state.group_by_values
+        });
+
+        match entry {
+            // Existing entry for this group value
+            Some((_hash, group_idx)) => {
+                let group_state = &mut group_states[*group_idx];
+                // 1.3
+                if group_state.indices.is_empty() {
+                    groups_with_rows.push(*group_idx);
+                };
+                group_state.indices.push(row as u32); // remember this row
+            }
+            //  1.2 Need to create new entry
+            None => {
+                // Add new entry to group_states and save newly created index
+                let group_state = RowGroupState {
+                    group_by_values: group_rows[row].clone(),
+                    aggregation_buffer: vec![0; state_layout.fixed_part_width()],
+                    indices: vec![row as u32], // 1.3
+                };
+                let group_idx = group_states.len();
+                group_states.push(group_state);
+                groups_with_rows.push(group_idx);
+
+                // for hasher function, use precomputed hash value
+                map.insert(hash, (hash, group_idx), |(hash, _group_idx)| *hash);
+            }
+        };
+    }
+
+    // Collect all indices + offsets based on keys in this vec
+    let mut batch_indices: UInt32Builder = UInt32Builder::new(0);
+    let mut offsets = vec![0];
+    let mut offset_so_far = 0;
+    for group_idx in groups_with_rows.iter() {
+        let indices = &aggr_state.group_states[*group_idx].indices;
+        batch_indices.append_slice(indices)?;
+        offset_so_far += indices.len();
+        offsets.push(offset_so_far);
+    }
+    let batch_indices = batch_indices.finish();
+
+    // `Take` all values based on indices into Arrays
+    let values: Vec<Vec<Arc<dyn Array>>> = aggr_input_values
+        .iter()
+        .map(|array| {
+            array
+                .iter()
+                .map(|array| {
+                    compute::take(
+                        array.as_ref(),
+                        &batch_indices,
+                        None, // None: no index check
+                    )
+                    .unwrap()
+                })
+                .collect()
+            // 2.3
+        })
+        .collect();
+
+    // 2.1 for each key in this batch
+    // 2.2 for each aggregation
+    // 2.3 `slice` from each of its arrays the keys' values
+    // 2.4 update / merge the accumulator with the values
+    // 2.5 clear indices
+    groups_with_rows
+        .iter()
+        .zip(offsets.windows(2))
+        .try_for_each(|(group_idx, offsets)| {
+            let group_state = &mut aggr_state.group_states[*group_idx];
+            // 2.2
+            accumulators
+                .iter_mut()
+                .zip(values.iter())
+                .map(|(accumulator, aggr_array)| {
+                    (
+                        accumulator,
+                        aggr_array
+                            .iter()
+                            .map(|array| {
+                                // 2.3
+                                array.slice(offsets[0], offsets[1] - offsets[0])
+                            })
+                            .collect::<Vec<ArrayRef>>(),
+                    )
+                })
+                .try_for_each(|(accumulator, values)| {
+                    let mut state_accessor =
+                        RowAccessor::new_from_layout(state_layout.clone());
+                    state_accessor
+                        .point_to(0, group_state.aggregation_buffer.as_mut_slice());
+                    match mode {
+                        AggregateMode::Partial => {
+                            accumulator.update_batch(&values, &mut state_accessor)
+                        }
+                        AggregateMode::FinalPartitioned | AggregateMode::Final => {
+                            // note: the aggregation here is over states, not values, thus the merge
+                            accumulator.merge_batch(&values, &mut state_accessor)
+                        }
+                    }
+                })
+                // 2.5
+                .and({
+                    group_state.indices.clear();
+                    Ok(())
+                })
+        })?;
+
+    Ok(())
+}
+
+/// The state that is built for each output group.
+#[derive(Debug)]
+struct RowGroupState {
+    /// The actual group by values, stored sequentially
+    group_by_values: Vec<u8>,
+
+    // Accumulator state, stored sequentially
+    aggregation_buffer: Vec<u8>,
+
+    /// scratch space used to collect indices for input rows in a
+    /// bach that have values to aggregate. Reset on each batch
+    indices: Vec<u32>,
+}
+
+/// The state of all the groups
+#[derive(Default)]
+struct AggregationState {
+    /// Logically maps group values to an index in `group_states`
+    ///
+    /// Uses the raw API of hashbrown to avoid actually storing the
+    /// keys in the table
+    ///
+    /// keys: u64 hashes of the GroupValue
+    /// values: (hash, index into `group_states`)
+    map: RawTable<(u64, usize)>,
+
+    /// State for each group
+    group_states: Vec<RowGroupState>,
+}
+
+impl std::fmt::Debug for AggregationState {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        // hashes are not store inline, so could only get values
+        let map_string = "RawTable";
+        f.debug_struct("RowAccumulators")
+            .field("map", &map_string)
+            .field("row_group_states", &self.group_states)

Review Comment:
   ```suggestion
               .field("group_states", &self.group_states)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2375: Grouped Aggregate in row format

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2375:
URL: https://github.com/apache/arrow-datafusion/pull/2375#discussion_r867487069


##########
datafusion/physical-expr/src/aggregate/sum.rs:
##########
@@ -338,6 +455,42 @@ impl Accumulator for SumAccumulator {
     }
 }
 
+#[derive(Debug)]
+struct SumAccumulatorV2 {
+    index: usize,
+    datatype: DataType,
+}
+
+impl SumAccumulatorV2 {
+    pub fn new(index: usize, datatype: DataType) -> Self {
+        Self { index, datatype }
+    }
+}
+
+impl AccumulatorV2 for SumAccumulatorV2 {
+    fn update_batch(
+        &mut self,
+        values: &[ArrayRef],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        let values = &values[0];
+        add_to_row(&self.datatype, self.index, accessor, &sum_batch(values)?)?;

Review Comment:
   Also, possibly related https://github.com/apache/arrow-datafusion/issues/2447



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove commented on pull request #2375: WIP: Use row format for aggregate

Posted by GitBox <gi...@apache.org>.
andygrove commented on PR #2375:
URL: https://github.com/apache/arrow-datafusion/pull/2375#issuecomment-1113633390

   > The current PR seems scary in size, maybe I should move the physical_plan folder re-org as a separate PR first.
   
   I think that would help.
   
   Are we replacing HashAggregate completely with a new row based aggregate or do we want to support both? Does hash aggregate still have advantages for some use cases? Maybe we can have a config setting for which one to use?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2375: Grouped Aggregate in row format

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2375:
URL: https://github.com/apache/arrow-datafusion/pull/2375#issuecomment-1118733974

   I am starting to check this out -- I'll try to finish today but I may run out of time. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2375: Grouped Aggregate in row format

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2375:
URL: https://github.com/apache/arrow-datafusion/pull/2375#issuecomment-1118736449

   @yjshen  do you have any benchmark numbers you can share?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2375: Grouped Aggregate in row format

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2375:
URL: https://github.com/apache/arrow-datafusion/pull/2375#discussion_r866068228


##########
datafusion/core/Cargo.toml:
##########
@@ -66,7 +64,7 @@ datafusion-data-access = { path = "../../data-access", version = "1.0.0" }
 datafusion-expr = { path = "../expr", version = "7.0.0" }
 datafusion-jit = { path = "../jit", version = "7.0.0", optional = true }
 datafusion-physical-expr = { path = "../physical-expr", version = "7.0.0" }
-datafusion-row = { path = "../row", version = "7.0.0", optional = true }
+datafusion-row = { path = "../row", version = "7.0.0" }

Review Comment:
   👍 



##########
datafusion/core/src/physical_plan/hash_utils.rs:
##########
@@ -265,7 +265,42 @@ pub fn create_hashes<'a>(
     for hash in hashes_buffer.iter_mut() {
         *hash = 0
     }
-    return Ok(hashes_buffer);
+    Ok(hashes_buffer)
+}
+
+/// Test version of `create_row_hashes` that produces the same value for
+/// all hashes (to test collisions)
+///
+/// See comments on `hashes_buffer` for more details
+#[cfg(feature = "force_hash_collisions")]
+pub fn create_row_hashes<'a>(
+    _rows: &[Vec<u8>],
+    _random_state: &RandomState,
+    hashes_buffer: &'a mut Vec<u64>,
+) -> Result<&'a mut Vec<u64>> {
+    for hash in hashes_buffer.iter_mut() {
+        *hash = 0
+    }
+    Ok(hashes_buffer)
+}
+
+/// Test version of `create_row_hashes` that produces the same value for
+/// all hashes (to test collisions)
+///
+/// See comments on `hashes_buffer` for more details

Review Comment:
   this comment does not seem to be accurate -- it is the *non*test version



##########
datafusion/physical-expr/src/aggregate/mod.rs:
##########
@@ -77,6 +79,18 @@ pub trait AggregateExpr: Send + Sync + Debug {
     fn name(&self) -> &str {
         "AggregateExpr: default name"
     }
+
+    /// If the aggregate expression is supported by row format
+    fn accumulator_v2_supported(&self) -> bool {
+        false
+    }
+
+    fn create_accumulator_v2(

Review Comment:
   ```suggestion
       /// the accumulator used to accumulate values from the expressions that uses Row format. 
       /// the accumulator expects the same number of arguments as `expressions` and must
       /// return states with the same description as `state_fields`
       fn create_accumulator_v2(
   ```



##########
datafusion/physical-expr/src/aggregate/accumulator_v2.rs:
##########
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Accumulator over row format
+
+use arrow::array::ArrayRef;
+use datafusion_common::{Result, ScalarValue};
+use datafusion_row::accessor::RowAccessor;
+use std::fmt::Debug;
+
+pub trait AccumulatorV2: Send + Sync + Debug {

Review Comment:
   What about calling this `RowAccumulator` or `AccumulatorRow` so the name hints at the difference between it and `Accumlator`
   
   I also think it would help to add some comments here:
   
   1. This is a row based accumulator where the internal aggregate state is stored using row format.



##########
datafusion/physical-expr/src/aggregate/mod.rs:
##########
@@ -77,6 +79,18 @@ pub trait AggregateExpr: Send + Sync + Debug {
     fn name(&self) -> &str {
         "AggregateExpr: default name"
     }
+
+    /// If the aggregate expression is supported by row format

Review Comment:
   This approach of adding two new functions to the `Aggregate` trait has the very nice property that it is backwards compatible. However, I wonder if we can clarify:
   1. Do aggregates need to implement `accumulator` if they also implement `create_accumulator_v2`?
   2. What is the benefit of creating accumulator v2 (it is faster?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yjshen commented on pull request #2375: WIP: Use row format for aggregate

Posted by GitBox <gi...@apache.org>.
yjshen commented on PR #2375:
URL: https://github.com/apache/arrow-datafusion/pull/2375#issuecomment-1113877450

   Sorry to mix two things into one PR. I would divide this as separate PRs. One for each of these ideas:
   
   1. Promote `physical-plan/hash_aggregates.rs` to a directory, and rename it to `aggregates`. We already have a hash-based implementation, `GroupedHashAggregateStream` for aggregate with grouping keys, and a non-hash implementation for aggregate without grouping keys (It's a single record state but named `HashAggregateStream` although it's not related to `Hash` at all).
   
   - We could further enrich the aggregation method from hash-based to sort-based at runtime when we are run out of memory, as described in https://github.com/apache/arrow-datafusion/issues/1570
   
   2. Use row format to store grouping keys and accumulator states when all accumulator states are fixed-sized. Use `Vec<ScalarValue>` for all other cases (when we have at least one var length accumulator state, or any of the `AggregateExpr`s doesn't support row-based accumulator yet).
   
   >  Maybe we can have a config setting for which one to use
   
   I think the choice between row-based accumulator states vs `Vec<ScalarValue>` based accumulator states will depend on row-based accumulator capability during query execution, we are only using row-based aggregate states when we have all its accumulators support. (If and only if we are sure that the row-based version will always outperform `Vec<ScalarValue>` version whenever applicable, based on benchmark results of course.)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yjshen commented on pull request #2375: Grouped Aggregate in row format

Posted by GitBox <gi...@apache.org>.
yjshen commented on PR #2375:
URL: https://github.com/apache/arrow-datafusion/pull/2375#issuecomment-1120349397

   > Perhaps I can make this less concerning by enumerating what work remains to entirely switch to RowAggregate (and remove AggregateStream entirely).
   
   @alamb @andygrove I revisited our current row implementation and listed all the TODO items I could think of in #1861, and in the process, I think we can eliminate these code duplications and constantly improve performance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yjshen commented on a diff in pull request #2375: Grouped Aggregate in row format

Posted by GitBox <gi...@apache.org>.
yjshen commented on code in PR #2375:
URL: https://github.com/apache/arrow-datafusion/pull/2375#discussion_r866606193


##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -142,6 +147,12 @@ impl AggregateExec {
     pub fn input_schema(&self) -> SchemaRef {
         self.input_schema.clone()
     }
+
+    fn row_aggregate_supported(&self) -> bool {
+        let group_schema = group_schema(&self.schema, self.group_expr.len());
+        row_supported(&group_schema, RowType::Compact)

Review Comment:
   I'm using the compact layout for group keys and the word-aligned layout for aggregate buffer. I should emphasize this in the doc.



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -0,0 +1,472 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Hash aggregation through row format
+
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::vec;
+
+use ahash::RandomState;
+use futures::{
+    ready,
+    stream::{Stream, StreamExt},
+};
+
+use crate::error::Result;
+use crate::physical_plan::aggregates::{
+    evaluate, evaluate_many, group_schema, AccumulatorItemV2, AggregateMode,
+};
+use crate::physical_plan::hash_utils::create_row_hashes;
+use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
+use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
+use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
+
+use arrow::compute::cast;
+use arrow::datatypes::Schema;
+use arrow::{array::ArrayRef, compute};
+use arrow::{
+    array::{Array, UInt32Builder},
+    error::{ArrowError, Result as ArrowResult},
+};
+use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
+use datafusion_common::ScalarValue;
+use datafusion_row::accessor::RowAccessor;
+use datafusion_row::layout::RowLayout;
+use datafusion_row::reader::{read_row, RowReader};
+use datafusion_row::writer::{write_row, RowWriter};
+use datafusion_row::{MutableRecordBatch, RowType};
+use hashbrown::raw::RawTable;
+
+/// Grouping aggregate with row format to store the aggregation state.
+///
+/// The Architecture is similar to that in [`super::GroupedHashAggregateStream`] but use
+/// row format inside the HashTable to store aggregation buffers.
+pub(crate) struct GroupedHashAggregateStreamV2 {
+    schema: SchemaRef,
+    input: SendableRecordBatchStream,
+    mode: AggregateMode,
+    aggr_state: AggregationState,
+    aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
+
+    group_expr: Vec<Arc<dyn PhysicalExpr>>,
+    accumulators: Vec<AccumulatorItemV2>,
+
+    group_schema: SchemaRef,
+    aggr_schema: SchemaRef,
+    aggr_layout: RowLayout,
+
+    baseline_metrics: BaselineMetrics,
+    random_state: RandomState,
+    finished: bool,
+}
+
+fn aggr_state_schema(aggr_expr: &[Arc<dyn AggregateExpr>]) -> Result<SchemaRef> {
+    let fields = aggr_expr
+        .iter()
+        .flat_map(|expr| expr.state_fields().unwrap().into_iter())
+        .collect::<Vec<_>>();
+    Ok(Arc::new(Schema::new(fields)))
+}
+
+impl GroupedHashAggregateStreamV2 {
+    /// Create a new GroupedRowHashAggregateStream
+    pub fn new(
+        mode: AggregateMode,
+        schema: SchemaRef,
+        group_expr: Vec<Arc<dyn PhysicalExpr>>,
+        aggr_expr: Vec<Arc<dyn AggregateExpr>>,
+        input: SendableRecordBatchStream,
+        baseline_metrics: BaselineMetrics,
+    ) -> Result<Self> {
+        let timer = baseline_metrics.elapsed_compute().timer();
+
+        // The expressions to evaluate the batch, one vec of expressions per aggregation.
+        // Assume create_schema() always put group columns in front of aggr columns, we set
+        // col_idx_base to group expression count.
+        let aggregate_expressions =
+            aggregates::aggregate_expressions(&aggr_expr, &mode, group_expr.len())?;
+
+        let accumulators = aggregates::create_accumulators_v2(&aggr_expr)?;
+
+        let group_schema = group_schema(&schema, group_expr.len());
+        let aggr_schema = aggr_state_schema(&aggr_expr)?;
+
+        let aggr_layout = RowLayout::new(&aggr_schema, RowType::WordAligned);
+        timer.done();
+
+        Ok(Self {
+            schema,
+            mode,
+            input,
+            group_expr,
+            accumulators,
+            group_schema,
+            aggr_schema,
+            aggr_layout,
+            baseline_metrics,
+            aggregate_expressions,
+            aggr_state: Default::default(),
+            random_state: Default::default(),
+            finished: false,
+        })
+    }
+}
+
+impl Stream for GroupedHashAggregateStreamV2 {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        let this = &mut *self;
+        if this.finished {
+            return Poll::Ready(None);
+        }
+
+        let elapsed_compute = this.baseline_metrics.elapsed_compute();
+
+        loop {
+            let result = match ready!(this.input.poll_next_unpin(cx)) {
+                Some(Ok(batch)) => {
+                    let timer = elapsed_compute.timer();
+                    let result = group_aggregate_batch(
+                        &this.mode,
+                        &this.random_state,
+                        &this.group_expr,
+                        &mut this.accumulators,
+                        &this.group_schema,
+                        &this.aggr_layout,
+                        batch,
+                        &mut this.aggr_state,
+                        &this.aggregate_expressions,
+                    );
+
+                    timer.done();
+
+                    match result {
+                        Ok(_) => continue,
+                        Err(e) => Err(ArrowError::ExternalError(Box::new(e))),
+                    }
+                }
+                Some(Err(e)) => Err(e),
+                None => {
+                    this.finished = true;
+                    let timer = this.baseline_metrics.elapsed_compute().timer();
+                    let result = create_batch_from_map(

Review Comment:
   Yes, I plan to do this in #1570 as my next step. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yjshen commented on a diff in pull request #2375: Grouped Aggregate in row format

Posted by GitBox <gi...@apache.org>.
yjshen commented on code in PR #2375:
URL: https://github.com/apache/arrow-datafusion/pull/2375#discussion_r866727521


##########
datafusion/physical-expr/src/aggregate/sum.rs:
##########
@@ -338,6 +455,42 @@ impl Accumulator for SumAccumulator {
     }
 }
 
+#[derive(Debug)]
+struct SumAccumulatorV2 {
+    index: usize,

Review Comment:
   Add a new method `state_index(&self) -> usize` and explained the meaning in `RowAccumulator` doc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org