You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "mingmwang (via GitHub)" <gi...@apache.org> on 2023/04/25 08:46:23 UTC

[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

mingmwang commented on code in PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#discussion_r1176203940


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -591,38 +787,93 @@ impl GroupedHashAggregateStream {
                 }
                 let batch_indices = batch_indices.finish();
 
-                let row_values = get_at_indices(&row_aggr_input_values, &batch_indices)?;
-                let normal_values =
-                    get_at_indices(&normal_aggr_input_values, &batch_indices)?;
                 let row_filter_values =
                     get_optional_filters(&row_filter_values, &batch_indices);
                 let normal_filter_values =
                     get_optional_filters(&normal_filter_values, &batch_indices);
-                self.update_accumulators_using_batch(
-                    &groups_with_rows,
-                    &offsets,
-                    &row_values,
-                    &normal_values,
-                    &row_filter_values,
-                    &normal_filter_values,
-                    &mut allocated,
-                )?;
+                if self
+                    .aggregation_ordering
+                    .as_ref()
+                    .map_or(false, |s| s.mode == GroupByOrderMode::Ordered)
+                {
+                    self.update_accumulators_using_batch(
+                        &groups_with_rows,
+                        &offsets,
+                        &row_aggr_input_values,
+                        &normal_aggr_input_values,
+                        &row_filter_values,
+                        &normal_filter_values,
+                        &mut allocated,
+                    )?;
+                } else {
+                    let row_values =
+                        get_at_indices(&row_aggr_input_values, &batch_indices)?;
+                    let normal_values =
+                        get_at_indices(&normal_aggr_input_values, &batch_indices)?;
+                    self.update_accumulators_using_batch(
+                        &groups_with_rows,
+                        &offsets,
+                        &row_values,
+                        &normal_values,
+                        &row_filter_values,
+                        &normal_filter_values,
+                        &mut allocated,
+                    )?;
+                };
             }
         }
         allocated += self
             .row_converter
             .size()
             .saturating_sub(row_converter_size_pre);
+
+        if self.aggregation_ordering.is_some() {
+            let mut new_result = false;
+            let last_ordered_columns = self
+                .aggr_state
+                .group_states
+                .last()
+                .map(|item| item.ordered_columns.clone());
+
+            if let Some(last_ordered_columns) = last_ordered_columns {
+                for cur_group in &mut self.aggr_state.group_states {
+                    if cur_group.ordered_columns != last_ordered_columns {
+                        // We will no longer receive value. Set status to GroupStatus::CanEmit
+                        // meaning we can generate result for this group.
+                        cur_group.status = GroupStatus::CanEmit;
+                        new_result = true;
+                    }
+                }
+            }
+            if new_result {
+                self.exec_state = ExecutionState::ProducingOutput;
+            }
+        }
+
         Ok(allocated)
     }
 }
 
+#[derive(Debug, PartialEq)]
+enum GroupStatus {
+    // `GroupProgress` means data for current group is not complete. New data may arrive.
+    GroupProgress,
+    // `CanEmit` means data for current group is completed. And its result can emitted.
+    CanEmit,
+    // Emitted means that result for the groups is outputted. Group can be pruned from state.
+    Emitted,
+}
+
 /// The state that is built for each output group.
 #[derive(Debug)]
 pub struct GroupState {
     /// The actual group by values, stored sequentially
     group_by_values: OwnedRow,
 
+    ordered_columns: Option<Vec<ScalarValue>>,

Review Comment:
   I think this is not efficient. We should avoid using Vec of Vec structs in the critical data structs. The Vec itself is actually a pointer which will point to some other memory address.
   Because the GroupState is hold in a global Vec, if we store `ordered_columns` in another Vec, when the code access those member, the memory access pattern will be very random.
   
   You can implement something similar to arrow's `GenericStringArray` to achieve a better memory layout.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org