You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/05/06 09:35:42 UTC

[GitHub] [arrow-datafusion] yjshen commented on a diff in pull request #2375: Grouped Aggregate in row format

yjshen commented on code in PR #2375:
URL: https://github.com/apache/arrow-datafusion/pull/2375#discussion_r866606193


##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -142,6 +147,12 @@ impl AggregateExec {
     pub fn input_schema(&self) -> SchemaRef {
         self.input_schema.clone()
     }
+
+    fn row_aggregate_supported(&self) -> bool {
+        let group_schema = group_schema(&self.schema, self.group_expr.len());
+        row_supported(&group_schema, RowType::Compact)

Review Comment:
   I'm using the compact layout for group keys and the word-aligned layout for aggregate buffer. I should emphasize this in the doc.



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -0,0 +1,472 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Hash aggregation through row format
+
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::vec;
+
+use ahash::RandomState;
+use futures::{
+    ready,
+    stream::{Stream, StreamExt},
+};
+
+use crate::error::Result;
+use crate::physical_plan::aggregates::{
+    evaluate, evaluate_many, group_schema, AccumulatorItemV2, AggregateMode,
+};
+use crate::physical_plan::hash_utils::create_row_hashes;
+use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
+use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
+use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
+
+use arrow::compute::cast;
+use arrow::datatypes::Schema;
+use arrow::{array::ArrayRef, compute};
+use arrow::{
+    array::{Array, UInt32Builder},
+    error::{ArrowError, Result as ArrowResult},
+};
+use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
+use datafusion_common::ScalarValue;
+use datafusion_row::accessor::RowAccessor;
+use datafusion_row::layout::RowLayout;
+use datafusion_row::reader::{read_row, RowReader};
+use datafusion_row::writer::{write_row, RowWriter};
+use datafusion_row::{MutableRecordBatch, RowType};
+use hashbrown::raw::RawTable;
+
+/// Grouping aggregate with row format to store the aggregation state.
+///
+/// The Architecture is similar to that in [`super::GroupedHashAggregateStream`] but use
+/// row format inside the HashTable to store aggregation buffers.
+pub(crate) struct GroupedHashAggregateStreamV2 {
+    schema: SchemaRef,
+    input: SendableRecordBatchStream,
+    mode: AggregateMode,
+    aggr_state: AggregationState,
+    aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
+
+    group_expr: Vec<Arc<dyn PhysicalExpr>>,
+    accumulators: Vec<AccumulatorItemV2>,
+
+    group_schema: SchemaRef,
+    aggr_schema: SchemaRef,
+    aggr_layout: RowLayout,
+
+    baseline_metrics: BaselineMetrics,
+    random_state: RandomState,
+    finished: bool,
+}
+
+fn aggr_state_schema(aggr_expr: &[Arc<dyn AggregateExpr>]) -> Result<SchemaRef> {
+    let fields = aggr_expr
+        .iter()
+        .flat_map(|expr| expr.state_fields().unwrap().into_iter())
+        .collect::<Vec<_>>();
+    Ok(Arc::new(Schema::new(fields)))
+}
+
+impl GroupedHashAggregateStreamV2 {
+    /// Create a new GroupedRowHashAggregateStream
+    pub fn new(
+        mode: AggregateMode,
+        schema: SchemaRef,
+        group_expr: Vec<Arc<dyn PhysicalExpr>>,
+        aggr_expr: Vec<Arc<dyn AggregateExpr>>,
+        input: SendableRecordBatchStream,
+        baseline_metrics: BaselineMetrics,
+    ) -> Result<Self> {
+        let timer = baseline_metrics.elapsed_compute().timer();
+
+        // The expressions to evaluate the batch, one vec of expressions per aggregation.
+        // Assume create_schema() always put group columns in front of aggr columns, we set
+        // col_idx_base to group expression count.
+        let aggregate_expressions =
+            aggregates::aggregate_expressions(&aggr_expr, &mode, group_expr.len())?;
+
+        let accumulators = aggregates::create_accumulators_v2(&aggr_expr)?;
+
+        let group_schema = group_schema(&schema, group_expr.len());
+        let aggr_schema = aggr_state_schema(&aggr_expr)?;
+
+        let aggr_layout = RowLayout::new(&aggr_schema, RowType::WordAligned);
+        timer.done();
+
+        Ok(Self {
+            schema,
+            mode,
+            input,
+            group_expr,
+            accumulators,
+            group_schema,
+            aggr_schema,
+            aggr_layout,
+            baseline_metrics,
+            aggregate_expressions,
+            aggr_state: Default::default(),
+            random_state: Default::default(),
+            finished: false,
+        })
+    }
+}
+
+impl Stream for GroupedHashAggregateStreamV2 {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        let this = &mut *self;
+        if this.finished {
+            return Poll::Ready(None);
+        }
+
+        let elapsed_compute = this.baseline_metrics.elapsed_compute();
+
+        loop {
+            let result = match ready!(this.input.poll_next_unpin(cx)) {
+                Some(Ok(batch)) => {
+                    let timer = elapsed_compute.timer();
+                    let result = group_aggregate_batch(
+                        &this.mode,
+                        &this.random_state,
+                        &this.group_expr,
+                        &mut this.accumulators,
+                        &this.group_schema,
+                        &this.aggr_layout,
+                        batch,
+                        &mut this.aggr_state,
+                        &this.aggregate_expressions,
+                    );
+
+                    timer.done();
+
+                    match result {
+                        Ok(_) => continue,
+                        Err(e) => Err(ArrowError::ExternalError(Box::new(e))),
+                    }
+                }
+                Some(Err(e)) => Err(e),
+                None => {
+                    this.finished = true;
+                    let timer = this.baseline_metrics.elapsed_compute().timer();
+                    let result = create_batch_from_map(

Review Comment:
   Yes, I plan to do this in #1570 as my next step. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org