You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "yjshen (via GitHub)" <gi...@apache.org> on 2023/04/23 08:12:00 UTC

[GitHub] [arrow-datafusion] yjshen commented on a diff in pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

yjshen commented on code in PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#discussion_r1174509849


##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -822,13 +930,127 @@ mod tests {
     use std::task::{Context, Poll};
 
     use super::StreamType;
+    use crate::physical_plan::aggregates::GroupByOrderMode::{Ordered, PartiallyOrdered};
     use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+    use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
     use crate::physical_plan::{
         ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
         Statistics,
     };
     use crate::prelude::SessionContext;
 
+    // Generate a schema which consists of 5 columns (a, b, c, d, e)
+    fn create_test_schema() -> Result<SchemaRef> {
+        let a = Field::new("a", DataType::Int32, true);
+        let b = Field::new("b", DataType::Int32, true);
+        let c = Field::new("c", DataType::Int32, true);
+        let d = Field::new("d", DataType::Int32, true);
+        let e = Field::new("e", DataType::Int32, true);
+        let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
+
+        Ok(schema)
+    }
+
+    // Created a sorted parquet exec
+    fn csv_exec_sorted(

Review Comment:
   This could be removed, and there's an identical function crate::test::csv_exec_sorted



##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -331,10 +408,32 @@ impl AggregateExec {
                     batch_size,
                     context,
                     partition,
+                    state,
                 )?,
             ))
         }
     }
+
+    fn calc_aggregate_state(&self) -> Option<AggregationOrdering> {

Review Comment:
   We have `AggregationState` in row_hash. The word `state` might be confusing in aggregate contexts.



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -663,12 +912,32 @@ impl std::fmt::Debug for AggregationState {
 }
 
 impl GroupedHashAggregateStream {
+    /// Prune the groups from the `self.aggr_state.group_states` which are in
+    /// `GroupStatus::Emitted`(this status means that result of this group emitted/outputted already, and
+    /// we are sure that these groups cannot receive new rows.) status.
+    fn prune(&mut self) {

Review Comment:
   This is smart!



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -591,38 +785,93 @@ impl GroupedHashAggregateStream {
                 }
                 let batch_indices = batch_indices.finish();
 
-                let row_values = get_at_indices(&row_aggr_input_values, &batch_indices)?;
-                let normal_values =
-                    get_at_indices(&normal_aggr_input_values, &batch_indices)?;
                 let row_filter_values =
                     get_optional_filters(&row_filter_values, &batch_indices);
                 let normal_filter_values =
                     get_optional_filters(&normal_filter_values, &batch_indices);
-                self.update_accumulators_using_batch(
-                    &groups_with_rows,
-                    &offsets,
-                    &row_values,
-                    &normal_values,
-                    &row_filter_values,
-                    &normal_filter_values,
-                    &mut allocated,
-                )?;
+                if self
+                    .state
+                    .as_ref()
+                    .map_or(false, |s| s.mode == GroupByOrderMode::Ordered)
+                {
+                    self.update_accumulators_using_batch(
+                        &groups_with_rows,
+                        &offsets,
+                        &row_aggr_input_values,
+                        &normal_aggr_input_values,
+                        &row_filter_values,
+                        &normal_filter_values,
+                        &mut allocated,
+                    )?;
+                } else {
+                    let row_values =
+                        get_at_indices(&row_aggr_input_values, &batch_indices)?;
+                    let normal_values =
+                        get_at_indices(&normal_aggr_input_values, &batch_indices)?;
+                    self.update_accumulators_using_batch(
+                        &groups_with_rows,
+                        &offsets,
+                        &row_values,
+                        &normal_values,
+                        &row_filter_values,
+                        &normal_filter_values,
+                        &mut allocated,
+                    )?;
+                };
             }
         }
         allocated += self
             .row_converter
             .size()
             .saturating_sub(row_converter_size_pre);
+
+        if self.state.is_some() {
+            let mut new_result = false;
+            let last_ordered_columns = self
+                .aggr_state
+                .group_states
+                .last()
+                .map(|item| item.ordered_columns.clone());
+
+            if let Some(last_ordered_columns) = last_ordered_columns {
+                for cur_group in &mut self.aggr_state.group_states {
+                    if cur_group.ordered_columns != last_ordered_columns {
+                        // We will no longer receive value. Set status to GroupStatus::CanEmit
+                        // meaning we can generate result for this group.
+                        cur_group.emit_status = GroupStatus::CanEmit;
+                        new_result = true;
+                    }
+                }
+            }
+            if new_result {
+                self.exec_state = ExecutionState::ProducingOutput;
+            }
+        }
+
         Ok(allocated)
     }
 }
 
+#[derive(Debug, PartialEq)]
+enum GroupStatus {
+    // `CannotEmit` means data for current group is not complete. New data may arrive.
+    CannotEmit,

Review Comment:
   `GroupProgress`, `Incomplete`, `ReadyToEmit` maybe.



##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -822,13 +930,127 @@ mod tests {
     use std::task::{Context, Poll};
 
     use super::StreamType;
+    use crate::physical_plan::aggregates::GroupByOrderMode::{Ordered, PartiallyOrdered};
     use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+    use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
     use crate::physical_plan::{
         ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
         Statistics,
     };
     use crate::prelude::SessionContext;
 
+    // Generate a schema which consists of 5 columns (a, b, c, d, e)
+    fn create_test_schema() -> Result<SchemaRef> {
+        let a = Field::new("a", DataType::Int32, true);
+        let b = Field::new("b", DataType::Int32, true);
+        let c = Field::new("c", DataType::Int32, true);
+        let d = Field::new("d", DataType::Int32, true);
+        let e = Field::new("e", DataType::Int32, true);
+        let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
+
+        Ok(schema)
+    }
+
+    // Created a sorted parquet exec
+    fn csv_exec_sorted(
+        schema: &SchemaRef,
+        sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
+        infinite_source: bool,
+    ) -> Arc<dyn ExecutionPlan> {
+        let sort_exprs = sort_exprs.into_iter().collect();
+
+        Arc::new(CsvExec::new(
+            FileScanConfig {
+                object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
+                file_schema: schema.clone(),
+                file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]],
+                statistics: Statistics::default(),
+                projection: None,
+                limit: None,
+                table_partition_cols: vec![],
+                output_ordering: Some(sort_exprs),
+                infinite_source,
+            },
+            false,
+            0,
+            FileCompressionType::UNCOMPRESSED,
+        ))
+    }
+
+    /// make PhysicalSortExpr with default options
+    fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
+        sort_expr_options(name, schema, SortOptions::default())
+    }
+
+    /// PhysicalSortExpr with specified options
+    fn sort_expr_options(
+        name: &str,
+        schema: &Schema,
+        options: SortOptions,
+    ) -> PhysicalSortExpr {
+        PhysicalSortExpr {
+            expr: col(name, schema).unwrap(),
+            options,
+        }
+    }
+
+    #[tokio::test]
+    async fn test_get_working_mode() -> Result<()> {
+        let test_schema = create_test_schema()?;
+        // Source is sorted by a ASC NULLS FIRST, b ASC NULLS FIRST, c ASC NULLS FIRST
+        // Column d, e is not ordered.
+        let sort_exprs = vec![
+            sort_expr("a", &test_schema),
+            sort_expr("b", &test_schema),
+            sort_expr("c", &test_schema),
+        ];
+        let input = csv_exec_sorted(&test_schema, sort_exprs, true);
+
+        // test cases consists of vector of tuples. Where each tuple represents a single test case.
+        // First field in the tuple is Vec<str> where each element in the vector represents GROUP BY columns
+        // For instance `vec!["a", "b"]` corresponds to GROUP BY a, b
+        // Second field in the tuple is Option<GroupByOrderMode>, which corresponds to expected algorithm mode.
+        // None represents that existing ordering is not sufficient to run executor with any one of the algorithms
+        // (We need to add SortExec to be able to run it).
+        // Some(GroupByOrderMode) represents, we can run algorithm with existing ordering; and algorithm should work in
+        // GroupByOrderMode.
+        let test_cases = vec![

Review Comment:
   👍



##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -72,6 +74,15 @@ pub enum AggregateMode {
     Single,
 }
 
+/// Group By expression modes
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum GroupByOrderMode {
+    /// Some of the expressions in the GROUP BY clause have an ordering.
+    PartiallyOrdered,

Review Comment:
   `PartiallyOrdered` is left over for follow-up PRs, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org