You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/04/24 14:11:55 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

alamb commented on code in PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#discussion_r1175324013


##########
datafusion-examples/examples/custom_datasource.rs:
##########
@@ -217,7 +217,7 @@ impl ExecutionPlan for CustomExec {
         datafusion::physical_plan::Partitioning::UnknownPartitioning(1)
     }
 
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+    fn output_ordering(&self) -> Option<Vec<PhysicalSortExpr>> {

Review Comment:
   Why does this signature need to change? The new signature requires an allocation / `clone` of a Vec where the previous one didn't and thus this seems to change the API for the worse.
   
   If it is to support a calculated output in the grouping perhaps we can calculate the output ordering once in the constructor rather than on demand.
   
   ```
   
       fn output_ordering(&self) -> Option<Vec<PhysicalSortExpr>> {
           self.calc_aggregation_ordering().map(|state| state.ordering)
       }
   ```



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -591,38 +785,93 @@ impl GroupedHashAggregateStream {
                 }
                 let batch_indices = batch_indices.finish();
 
-                let row_values = get_at_indices(&row_aggr_input_values, &batch_indices)?;
-                let normal_values =
-                    get_at_indices(&normal_aggr_input_values, &batch_indices)?;
                 let row_filter_values =
                     get_optional_filters(&row_filter_values, &batch_indices);
                 let normal_filter_values =
                     get_optional_filters(&normal_filter_values, &batch_indices);
-                self.update_accumulators_using_batch(
-                    &groups_with_rows,
-                    &offsets,
-                    &row_values,
-                    &normal_values,
-                    &row_filter_values,
-                    &normal_filter_values,
-                    &mut allocated,
-                )?;
+                if self
+                    .aggregation_ordering
+                    .as_ref()
+                    .map_or(false, |s| s.mode == GroupByOrderMode::Ordered)
+                {
+                    self.update_accumulators_using_batch(
+                        &groups_with_rows,
+                        &offsets,
+                        &row_aggr_input_values,
+                        &normal_aggr_input_values,
+                        &row_filter_values,
+                        &normal_filter_values,
+                        &mut allocated,
+                    )?;
+                } else {
+                    let row_values =
+                        get_at_indices(&row_aggr_input_values, &batch_indices)?;
+                    let normal_values =
+                        get_at_indices(&normal_aggr_input_values, &batch_indices)?;
+                    self.update_accumulators_using_batch(
+                        &groups_with_rows,
+                        &offsets,
+                        &row_values,
+                        &normal_values,
+                        &row_filter_values,
+                        &normal_filter_values,
+                        &mut allocated,
+                    )?;
+                };
             }
         }
         allocated += self
             .row_converter
             .size()
             .saturating_sub(row_converter_size_pre);
+
+        if self.aggregation_ordering.is_some() {
+            let mut new_result = false;
+            let last_ordered_columns = self
+                .aggr_state
+                .group_states
+                .last()
+                .map(|item| item.ordered_columns.clone());
+
+            if let Some(last_ordered_columns) = last_ordered_columns {

Review Comment:
   I may be mis understanding this code, but it seems like it is tracking per-group if the group can be emitted or not.  As I understand The `“Partial Streaming” / “Partitioned Streaming”` section of https://docs.google.com/document/d/16rm5VR1nGkY6DedMCh1NUmThwf3RduAweaBH9b1h6AY/edit#heading=h.uapxuhfa9wyi
   
   The entire hash table could be flushed each time a new value of date is seen:
   
   ![Screenshot 2023-04-24 at 10 08 15 AM](https://user-images.githubusercontent.com/490673/234021622-3d2eb19f-153c-4ba8-889c-5efd99c80424.png)
   
   Perhaps with the obvious vectorization of only checking on record batch boundaries, or something
   
   



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -313,22 +331,144 @@ impl RecordBatchStream for GroupedHashAggregateStream {
     }
 }
 
+/// This utility object encapsulates the row object, the hash and the group
+/// indices for a group. This information is used when executing streaming
+/// GROUP BY calculations.
+struct GroupOrderInfo {
+    owned_row: OwnedRow,
+    hash: u64,
+    range: Range<usize>,
+}
+
 impl GroupedHashAggregateStream {
-    // Update the row_aggr_state according to groub_by values (result of group_by_expressions)
+    // Update the aggr_state according to groub_by values (result of group_by_expressions) when group by

Review Comment:
   minor nit: `groub_by` --> `group_by`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org