You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/06/14 21:08:54 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6657: RowAccumulators support generics

alamb commented on code in PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#discussion_r1230166803


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -761,4 +784,67 @@ impl GroupedHashAggregateStream {
         }
         Ok(Some(RecordBatch::try_new(self.schema.clone(), output)?))
     }
+
+    fn update_one_accumulator_with_native_value<T1>(
+        &mut self,
+        groups_with_rows: &[usize],
+        agg_input_array1: &T1,
+        acc_idx1: usize,
+        filter_bool_array: &[Option<&BooleanArray>],
+    ) -> Result<()>
+    where
+        T1: ArrowArrayReader,
+    {
+        let accumulator1 = &self.row_accumulators[acc_idx1];
+        let filter_array1 = &filter_bool_array[acc_idx1];
+        for group_idx in groups_with_rows {
+            let group_state = &mut self.aggr_state.group_states[*group_idx];
+            let mut state_accessor =
+                RowAccessor::new_from_layout(self.row_aggr_layout.clone());
+            state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice());
+            for idx in &group_state.indices {
+                let value = col_to_value(agg_input_array1, filter_array1, *idx as usize);
+                accumulator1.update_value::<T1::Item>(value, &mut state_accessor);
+            }
+            // clear the group indices in this group
+            group_state.indices.clear();
+        }
+
+        Ok(())
+    }
+
+    fn update_two_accumulator2_with_native_value<T1, T2>(

Review Comment:
   What is the reason to special case 2 accumulators?



##########
datafusion/physical-expr/src/aggregate/row_accumulator.rs:
##########
@@ -81,19 +95,321 @@ pub trait RowAccumulator: Send + Sync + Debug {
 
 /// Returns if `data_type` is supported with `RowAccumulator`
 pub fn is_row_accumulator_support_dtype(data_type: &DataType) -> bool {
-    matches!(
-        data_type,
-        DataType::Boolean
-            | DataType::UInt8
-            | DataType::UInt16
-            | DataType::UInt32
-            | DataType::UInt64
-            | DataType::Int8
-            | DataType::Int16
-            | DataType::Int32
-            | DataType::Int64
-            | DataType::Float32
-            | DataType::Float64
-            | DataType::Decimal128(_, _)
-    )
+    matches_all_supported_data_types!(data_type)
+}
+
+/// Enum to dispatch the RowAccumulator, RowAccumulator contains generic methods and can not be used as the trait objects
+pub enum RowAccumulatorItem {

Review Comment:
   If we are going to have an enum that has all the accumulators in it, I don't see much value in the `RowAccumulator` trait anymore  -- could we simply use `RowAccumulatorItem` everywhere?
   
   To keep using a trait I think you would need a function somewhere that took the input data types and instantiated an instance of `RowAccumulator` -- I may be missing some subtlety here as well.



##########
datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs:
##########
@@ -718,21 +828,84 @@ impl RowAccumulator for BitXorRowAccumulator {
         Ok(())
     }
 
-    fn update_scalar_values(
-        &mut self,
-        values: &[ScalarValue],
+    fn update_single_row(

Review Comment:
   As I mentioned above, I think the code would be simpler and likely faster if there wasn't a special single value case
   
   I seems like given that you have vectorized the code for updating the aggregator values, and the group operators already have a slice if `[usize]` (aka the selection vector) of values to aggregate, maybe all callsites can use the selection vector



##########
datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs:
##########
@@ -649,36 +652,59 @@ impl BoundedAggregateStream {
             })
             .collect::<Result<Vec<_>>>()?;
 
-        for group_idx in groups_with_rows {
-            let group_state =
-                &mut self.aggr_state.ordered_group_states[*group_idx].group_state;
-            let mut state_accessor =
-                RowAccessor::new_from_layout(self.row_aggr_layout.clone());
-            state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice());
-            for idx in &group_state.indices {
-                for (accumulator, values_array, filter_array) in izip!(
-                    self.row_accumulators.iter_mut(),
-                    row_values.iter(),
-                    filter_bool_array.iter()
-                ) {
-                    if values_array.len() == 1 {
-                        let scalar_value =
-                            col_to_scalar(&values_array[0], filter_array, *idx as usize)?;
-                        accumulator.update_scalar(&scalar_value, &mut state_accessor)?;
-                    } else {
-                        let scalar_values = values_array
-                            .iter()
-                            .map(|array| {
-                                col_to_scalar(array, filter_array, *idx as usize)
-                            })
-                            .collect::<Result<Vec<_>>>()?;
-                        accumulator
-                            .update_scalar_values(&scalar_values, &mut state_accessor)?;
+        let mut single_value_acc_idx = vec![];
+        let mut single_row_acc_idx = vec![];
+        self.row_accumulators
+            .iter()
+            .zip(row_values.iter())
+            .enumerate()
+            .for_each(|(idx, (acc, values))| {
+                if let RowAccumulatorItem::COUNT(_) = acc {
+                    single_row_acc_idx.push(idx);
+                } else if values.len() == 1 {
+                    single_value_acc_idx.push(idx);
+                } else {
+                    single_row_acc_idx.push(idx);
+                };
+            });
+
+        if single_value_acc_idx.len() == 1 && single_row_acc_idx.is_empty() {
+            let acc_idx1 = single_value_acc_idx[0];
+            let array1 = &row_values[acc_idx1][0];
+            let array1_dt = array1.data_type();
+            dispatch_all_supported_data_types! { impl_one_row_accumulator_dispatch, array1_dt, array1, acc_idx1, self, update_one_accumulator_with_native_value, groups_with_rows, filter_bool_array}
+        } else if single_value_acc_idx.len() == 2 && single_row_acc_idx.is_empty() {
+            let acc_idx1 = single_value_acc_idx[0];
+            let acc_idx2 = single_value_acc_idx[1];
+            let array1 = &row_values[acc_idx1][0];
+            let array2 = &row_values[acc_idx2][0];
+            let array1_dt = array1.data_type();
+            let array2_dt = array2.data_type();
+            dispatch_all_supported_data_types_pairs! { impl_two_row_accumulators_dispatch, array1_dt, array2_dt, array1, array2, acc_idx1, acc_idx2, self, update_two_accumulator2_with_native_value, groups_with_rows, filter_bool_array}
+        } else {
+            for group_idx in groups_with_rows {
+                let group_state =
+                    &mut self.aggr_state.ordered_group_states[*group_idx].group_state;
+                let mut state_accessor =
+                    RowAccessor::new_from_layout(self.row_aggr_layout.clone());
+                state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice());
+                for idx in &group_state.indices {
+                    for (accumulator, values_array, filter_array) in izip!(
+                        self.row_accumulators.iter_mut(),
+                        row_values.iter(),
+                        filter_bool_array.iter()
+                    ) {
+                        accumulator.update_single_row(
+                            values_array,
+                            filter_array,
+                            *idx as usize,
+                            &mut state_accessor,
+                        )?;
                     }

Review Comment:
   As written, doesn't this code do all the type dispatch for for each `idx` in this loop?
   
   Would it be possible to pass in `group_state.indices` just once so the dispatch is done just once?
   
   Something like this (would need the traits / etc updated)
   
   ```suggestion
                       for (accumulator, values_array, filter_array) in izip!(
                           self.row_accumulators.iter_mut(),
                           row_values.iter(),
                           filter_bool_array.iter()
                       ) {
                           accumulator.update_indicies(
                               values_array,
                               filter_array,
                               &group_state.indices,
                               &mut state_accessor,
                           )?;
                       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org