You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "mingmwang (via GitHub)" <gi...@apache.org> on 2023/06/13 06:31:59 UTC

[GitHub] [arrow-datafusion] mingmwang opened a new pull request, #6657: RowAccumulator support generics

mingmwang opened a new pull request, #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #.
   
   # Rationale for this change
   
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   # What changes are included in this PR?
   
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   # Are these changes tested?
   
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
   -->
   
   # Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #6657: RowAccumulators support generics

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#discussion_r1228954668


##########
datafusion/row/src/reader.rs:
##########
@@ -142,7 +140,8 @@ impl<'a> RowReader<'a> {
         self.data = data;
     }
 
-    #[inline]
+    #[allow(dead_code)]
+    #[inline(always)]
     fn assert_index_valid(&self, idx: usize) {

Review Comment:
   This is not used in current code. I leave it here, this check can be invoked for each record batch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #6657: RowAccumulators support generics

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#discussion_r1241222042


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -761,4 +784,67 @@ impl GroupedHashAggregateStream {
         }
         Ok(Some(RecordBatch::try_new(self.schema.clone(), output)?))
     }
+
+    fn update_one_accumulator_with_native_value<T1>(
+        &mut self,
+        groups_with_rows: &[usize],
+        agg_input_array1: &T1,
+        acc_idx1: usize,
+        filter_bool_array: &[Option<&BooleanArray>],
+    ) -> Result<()>
+    where
+        T1: ArrowArrayReader,
+    {
+        let accumulator1 = &self.row_accumulators[acc_idx1];
+        let filter_array1 = &filter_bool_array[acc_idx1];
+        for group_idx in groups_with_rows {
+            let group_state = &mut self.aggr_state.group_states[*group_idx];
+            let mut state_accessor =
+                RowAccessor::new_from_layout(self.row_aggr_layout.clone());
+            state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice());
+            for idx in &group_state.indices {
+                let value = col_to_value(agg_input_array1, filter_array1, *idx as usize);
+                accumulator1.update_value::<T1::Item>(value, &mut state_accessor);
+            }
+            // clear the group indices in this group
+            group_state.indices.clear();
+        }
+
+        Ok(())
+    }
+
+    fn update_two_accumulator2_with_native_value<T1, T2>(

Review Comment:
   > After thinking about this a bit -- I wonder if we can somehow get rid of the need to typecast during the aggregate at all -- it seems to me the code would be much simpler if the aggregator didn't have to cast its input, but instead the inputs were cast as needed.
   
   I'm afraid the `typecast` can not be avoid.  This is because the arrow `Array` Trait itself does not provide any `read` functions to  read its value at a given `index`.  The trait must be downcast to the Struct(`PrimitiveArray<T:ArrowPrimitiveType>` or  `BooleanArray`) first.
   
   I think this is related to `Trait Objects` restrictions.  If we want to add `generate` methods(like read value at given index) method or add `GAT` to a trait, that trait can not be used as a `Trait Objects` any more.
   This is also why in this PR I had added a new trait. This new trait and its implementors bridge the type system between the arrow `ArrowPrimitiveType` and the row accumulator internal state types.
   
   ```rust
   pub trait ArrowArrayReader: Array {
       type Item: RowAccumulatorNativeType;
   
       /// Returns the element at index `i`
       /// # Panics
       /// Panics if the value is outside the bounds of the array
       fn value_at(&self, index: usize) -> Self::Item;
   
       /// Returns the element at index `i`
       /// # Safety
       /// Caller is responsible for ensuring that the index is within the bounds of the array
       unsafe fn value_at_unchecked(&self, index: usize) -> Self::Item;
   }
   
   impl<'a> ArrowArrayReader for &'a BooleanArray {
       type Item = bool;
   
       #[inline]
       fn value_at(&self, index: usize) -> Self::Item {
           BooleanArray::value(self, index)
       }
   
       #[inline]
       unsafe fn value_at_unchecked(&self, index: usize) -> Self::Item {
           BooleanArray::value_unchecked(self, index)
       }
   }
   
   impl<'a, T: ArrowPrimitiveType> ArrowArrayReader for &'a PrimitiveArray<T>
   where
       <T as ArrowPrimitiveType>::Native: RowAccumulatorNativeType,
   {
       type Item = T::Native;
   
       #[inline]
       fn value_at(&self, index: usize) -> Self::Item {
           PrimitiveArray::value(self, index)
       }
   
       #[inline]
       unsafe fn value_at_unchecked(&self, index: usize) -> Self::Item {
           PrimitiveArray::value_unchecked(self, index)
       }
   }
   
   ```
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #6657: RowAccumulators support generics

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#discussion_r1242059338


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -496,37 +497,57 @@ impl GroupedHashAggregateStream {
             })
             .collect::<Result<Vec<_>>>()?;
 
-        for group_idx in groups_with_rows {
-            let group_state = &mut self.aggr_state.group_states[*group_idx];
-            let mut state_accessor =
-                RowAccessor::new_from_layout(self.row_aggr_layout.clone());
-            state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice());
-            for idx in &group_state.indices {
+        let mut single_value_acc_idx = vec![];

Review Comment:
   @alamb 
   I had combined the RowAccumulators update logic in the `row_hash.rs `and `bounded_aggregate_stream.rs` and avoided the copy/paste. 
   There are still some other duplicate coded(not introduced by this PR),  I would prefer to leave it for future PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] Dandandan commented on a diff in pull request #6657: RowAccumulators support generics

Posted by "Dandandan (via GitHub)" <gi...@apache.org>.
Dandandan commented on code in PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#discussion_r1228679504


##########
datafusion/row/src/reader.rs:
##########
@@ -142,7 +140,8 @@ impl<'a> RowReader<'a> {
         self.data = data;
     }
 
-    #[inline]
+    #[allow(dead_code)]
+    #[inline(always)]
     fn assert_index_valid(&self, idx: usize) {

Review Comment:
   This still needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6657: RowAccumulators support generics

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#issuecomment-1596975545

   Here are my measurements, which are consistent with the information posted above
   
   ```
   --------------------
   Benchmark tpch_mem.json
   --------------------
   ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
   ┃ Query        ┃ main_base ┃ agg_framework ┃        Change ┃
   ┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
   │ QQuery 1     │  516.91ms │      530.19ms │     no change │
   │ QQuery 2     │  267.38ms │      272.23ms │     no change │
   │ QQuery 3     │  176.90ms │      183.52ms │     no change │
   │ QQuery 4     │  112.13ms │      114.36ms │     no change │
   │ QQuery 5     │  469.13ms │      489.38ms │     no change │
   │ QQuery 6     │   40.78ms │       39.62ms │     no change │
   │ QQuery 7     │ 1140.46ms │     1172.60ms │     no change │
   │ QQuery 8     │  250.55ms │      247.42ms │     no change │
   │ QQuery 9     │  639.51ms │      626.28ms │     no change │
   │ QQuery 10    │  345.85ms │      335.14ms │     no change │
   │ QQuery 11    │  277.08ms │      269.83ms │     no change │
   │ QQuery 12    │  167.45ms │      173.82ms │     no change │
   │ QQuery 13    │  660.02ms │      638.75ms │     no change │
   │ QQuery 14    │   52.18ms │       54.76ms │     no change │
   │ QQuery 15    │   85.71ms │       81.79ms │     no change │
   │ QQuery 16    │  222.74ms │      247.64ms │  1.11x slower │
   │ QQuery 17    │ 2760.35ms │     2393.44ms │ +1.15x faster │
   │ QQuery 18    │ 2978.54ms │     2981.99ms │     no change │
   │ QQuery 19    │  175.93ms │      164.40ms │ +1.07x faster │
   │ QQuery 20    │  859.35ms │      872.16ms │     no change │
   │ QQuery 21    │ 1526.80ms │     1419.54ms │ +1.08x faster │
   │ QQuery 22    │  146.40ms │      146.43ms │     no change │
   └──────────────┴───────────┴───────────────┴───────────────┘```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #6657: RowAccumulators support generics

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#discussion_r1232216074


##########
datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs:
##########
@@ -718,21 +828,84 @@ impl RowAccumulator for BitXorRowAccumulator {
         Ok(())
     }
 
-    fn update_scalar_values(
-        &mut self,
-        values: &[ScalarValue],
+    fn update_single_row(

Review Comment:
   I had updated the interface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6657: RowAccumulators support generics

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#discussion_r1241288862


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -761,4 +784,67 @@ impl GroupedHashAggregateStream {
         }
         Ok(Some(RecordBatch::try_new(self.schema.clone(), output)?))
     }
+
+    fn update_one_accumulator_with_native_value<T1>(
+        &mut self,
+        groups_with_rows: &[usize],
+        agg_input_array1: &T1,
+        acc_idx1: usize,
+        filter_bool_array: &[Option<&BooleanArray>],
+    ) -> Result<()>
+    where
+        T1: ArrowArrayReader,
+    {
+        let accumulator1 = &self.row_accumulators[acc_idx1];
+        let filter_array1 = &filter_bool_array[acc_idx1];
+        for group_idx in groups_with_rows {
+            let group_state = &mut self.aggr_state.group_states[*group_idx];
+            let mut state_accessor =
+                RowAccessor::new_from_layout(self.row_aggr_layout.clone());
+            state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice());
+            for idx in &group_state.indices {
+                let value = col_to_value(agg_input_array1, filter_array1, *idx as usize);
+                accumulator1.update_value::<T1::Item>(value, &mut state_accessor);
+            }
+            // clear the group indices in this group
+            group_state.indices.clear();
+        }
+
+        Ok(())
+    }
+
+    fn update_two_accumulator2_with_native_value<T1, T2>(

Review Comment:
   Sorry -- I mis understood -- I thought you meant like when aggregating SUM(Int16) that the argument needed to be cast to `Int32` or something as the type of the aggregator was different than the type of the inderlying value in the accumulator
   
   I agree the downcasting is needed to figure out the correct function to call -- though the more I think about it the more I think it can / should be done during setup time and not on every batch (as the types don't change during the query execution)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6657: RowAccumulators support generics

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#issuecomment-1589986532

   This looks exciting @mingmwang  -- I will review this tomorrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #6657: RowAccumulators support generics

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#issuecomment-1597880775

   > I am running some benchmarks on this branch -- I also hope to find some time next week to help simplify this PR as the basic idea is really cool and very much needed
   
   Yes, please help me to simply this PR. 
   I will resolve the conflicts today and also take a look at `q16` and check why it is slower.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #6657: RowAccumulators support generics

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#discussion_r1241226010


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -496,37 +497,57 @@ impl GroupedHashAggregateStream {
             })
             .collect::<Result<Vec<_>>>()?;
 
-        for group_idx in groups_with_rows {
-            let group_state = &mut self.aggr_state.group_states[*group_idx];
-            let mut state_accessor =
-                RowAccessor::new_from_layout(self.row_aggr_layout.clone());
-            state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice());
-            for idx in &group_state.indices {
+        let mut single_value_acc_idx = vec![];

Review Comment:
   We definitely should address the duplicate code between the row_hash.rs and bounded_aggregate_stream.rs.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6657: RowAccumulators support generics

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#issuecomment-1608071805

   @tustvold  and I had a long chat about this and we have an alternate proposal that removes the per-group overhead entirely: 
   
   
   https://github.com/apache/arrow-datafusion/issues/4973#issuecomment-1608068287


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #6657: RowAccumulators support generics

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#issuecomment-1590419290

   ![flamegraph](https://github.com/apache/arrow-datafusion/assets/10591925/c2d9c9db-7910-4457-b619-4f457719bf44)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #6657: RowAccumulators support generics

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#discussion_r1230261924


##########
datafusion/physical-expr/src/aggregate/row_accumulator.rs:
##########
@@ -81,19 +95,321 @@ pub trait RowAccumulator: Send + Sync + Debug {
 
 /// Returns if `data_type` is supported with `RowAccumulator`
 pub fn is_row_accumulator_support_dtype(data_type: &DataType) -> bool {
-    matches!(
-        data_type,
-        DataType::Boolean
-            | DataType::UInt8
-            | DataType::UInt16
-            | DataType::UInt32
-            | DataType::UInt64
-            | DataType::Int8
-            | DataType::Int16
-            | DataType::Int32
-            | DataType::Int64
-            | DataType::Float32
-            | DataType::Float64
-            | DataType::Decimal128(_, _)
-    )
+    matches_all_supported_data_types!(data_type)
+}
+
+/// Enum to dispatch the RowAccumulator, RowAccumulator contains generic methods and can not be used as the trait objects
+pub enum RowAccumulatorItem {

Review Comment:
    I think we can still use the `Trait` to define a common interface for all the row accumulators, we just can not use the trait objects anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] Dandandan commented on a diff in pull request #6657: RowAccumulators support generics

Posted by "Dandandan (via GitHub)" <gi...@apache.org>.
Dandandan commented on code in PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#discussion_r1228679232


##########
datafusion/row/src/writer.rs:
##########
@@ -139,7 +137,8 @@ impl RowWriter {
         self.row_width = self.layout.fixed_part_width();
     }
 
-    #[inline]
+    #[allow(dead_code)]
+    #[inline(always)]
     fn assert_index_valid(&self, idx: usize) {

Review Comment:
   Is this still needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #6657: RowAccumulators support generics

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#discussion_r1231718662


##########
datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs:
##########
@@ -649,36 +652,59 @@ impl BoundedAggregateStream {
             })
             .collect::<Result<Vec<_>>>()?;
 
-        for group_idx in groups_with_rows {
-            let group_state =
-                &mut self.aggr_state.ordered_group_states[*group_idx].group_state;
-            let mut state_accessor =
-                RowAccessor::new_from_layout(self.row_aggr_layout.clone());
-            state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice());
-            for idx in &group_state.indices {
-                for (accumulator, values_array, filter_array) in izip!(
-                    self.row_accumulators.iter_mut(),
-                    row_values.iter(),
-                    filter_bool_array.iter()
-                ) {
-                    if values_array.len() == 1 {
-                        let scalar_value =
-                            col_to_scalar(&values_array[0], filter_array, *idx as usize)?;
-                        accumulator.update_scalar(&scalar_value, &mut state_accessor)?;
-                    } else {
-                        let scalar_values = values_array
-                            .iter()
-                            .map(|array| {
-                                col_to_scalar(array, filter_array, *idx as usize)
-                            })
-                            .collect::<Result<Vec<_>>>()?;
-                        accumulator
-                            .update_scalar_values(&scalar_values, &mut state_accessor)?;
+        let mut single_value_acc_idx = vec![];
+        let mut single_row_acc_idx = vec![];
+        self.row_accumulators
+            .iter()
+            .zip(row_values.iter())
+            .enumerate()
+            .for_each(|(idx, (acc, values))| {
+                if let RowAccumulatorItem::COUNT(_) = acc {
+                    single_row_acc_idx.push(idx);
+                } else if values.len() == 1 {
+                    single_value_acc_idx.push(idx);
+                } else {
+                    single_row_acc_idx.push(idx);
+                };
+            });
+
+        if single_value_acc_idx.len() == 1 && single_row_acc_idx.is_empty() {
+            let acc_idx1 = single_value_acc_idx[0];
+            let array1 = &row_values[acc_idx1][0];
+            let array1_dt = array1.data_type();
+            dispatch_all_supported_data_types! { impl_one_row_accumulator_dispatch, array1_dt, array1, acc_idx1, self, update_one_accumulator_with_native_value, groups_with_rows, filter_bool_array}
+        } else if single_value_acc_idx.len() == 2 && single_row_acc_idx.is_empty() {
+            let acc_idx1 = single_value_acc_idx[0];
+            let acc_idx2 = single_value_acc_idx[1];
+            let array1 = &row_values[acc_idx1][0];
+            let array2 = &row_values[acc_idx2][0];
+            let array1_dt = array1.data_type();
+            let array2_dt = array2.data_type();
+            dispatch_all_supported_data_types_pairs! { impl_two_row_accumulators_dispatch, array1_dt, array2_dt, array1, array2, acc_idx1, acc_idx2, self, update_two_accumulator2_with_native_value, groups_with_rows, filter_bool_array}
+        } else {
+            for group_idx in groups_with_rows {
+                let group_state =
+                    &mut self.aggr_state.ordered_group_states[*group_idx].group_state;
+                let mut state_accessor =
+                    RowAccessor::new_from_layout(self.row_aggr_layout.clone());
+                state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice());
+                for idx in &group_state.indices {
+                    for (accumulator, values_array, filter_array) in izip!(
+                        self.row_accumulators.iter_mut(),
+                        row_values.iter(),
+                        filter_bool_array.iter()
+                    ) {
+                        accumulator.update_single_row(
+                            values_array,
+                            filter_array,
+                            *idx as usize,
+                            &mut state_accessor,
+                        )?;
                     }

Review Comment:
   > As written, doesn't this code do all the type dispatch for for each `idx` in this loop?
   > 
   > Would it be possible to pass in `group_state.indices` just once so the dispatch is done just once?
   > 
   > Something like this (would need the traits / etc updated)
   
   I will try this approach as you suggested.  But even with this change, the type dispatch overhead is still heavy because it is applied for every update groups with rows.
   
   
   
    
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6657: RowAccumulators support generics

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#issuecomment-1596165745

   I am running some benchmarks on this branch -- I also hope to find some time next week to help simplify this PR as the basic idea is really cool and very much needed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6657: RowAccumulators support generics

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#discussion_r1241126875


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -761,4 +785,67 @@ impl GroupedHashAggregateStream {
         }
         Ok(Some(RecordBatch::try_new(self.schema.clone(), output)?))
     }
+
+    fn update_one_accumulator_with_native_value<T1>(

Review Comment:
   This code is basically a copy of what is in `datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs`, right?



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -496,37 +497,57 @@ impl GroupedHashAggregateStream {
             })
             .collect::<Result<Vec<_>>>()?;
 
-        for group_idx in groups_with_rows {
-            let group_state = &mut self.aggr_state.group_states[*group_idx];
-            let mut state_accessor =
-                RowAccessor::new_from_layout(self.row_aggr_layout.clone());
-            state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice());
-            for idx in &group_state.indices {
+        let mut single_value_acc_idx = vec![];

Review Comment:
   FYI @ozankabak  and @mustafasrepo  -- here is an example where having two grouping operators (hash and bounded) requires 2x the code - not sure if we can find a way to reduce the duplication



##########
datafusion/physical-expr/src/aggregate/row_agg_macros.rs:
##########
@@ -0,0 +1,525 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[macro_export]
+macro_rules! matches_all_supported_data_types {
+    ($expression:expr) => {
+        matches!(
+            $expression,
+            DataType::Boolean
+                | DataType::UInt8
+                | DataType::UInt16
+                | DataType::UInt32
+                | DataType::UInt64
+                | DataType::Int8
+                | DataType::Int16
+                | DataType::Int32
+                | DataType::Int64
+                | DataType::Float32
+                | DataType::Float64
+                | DataType::Decimal128(_, _)
+        )
+    };
+}
+pub use matches_all_supported_data_types;
+
+#[macro_export]
+macro_rules! dispatch_all_supported_data_types {
+    ($macro:ident $(, $x:ident)*) => {
+        $macro! {
+                [$($x),*],
+                { Boolean, BooleanArray },
+                { Int8, Int8Array },
+                { Int16, Int16Array },
+                { Int32, Int32Array },
+                { Int64, Int64Array },
+                { UInt8, UInt8Array },
+                { UInt16, UInt16Array },
+                { UInt32, UInt32Array },
+                { UInt64, UInt64Array },
+                { Float32, Float32Array },
+                { Float64, Float64Array },
+                { Decimal128, Decimal128Array }
+        }
+    };
+}
+
+pub use dispatch_all_supported_data_types;
+
+// TODO generate the matching type pairs
+#[macro_export]

Review Comment:
   I really worry about this table -- it seems like it will be very hard to maintain (aka trying to add some new type or accumulator type will be very hard)



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -761,4 +784,67 @@ impl GroupedHashAggregateStream {
         }
         Ok(Some(RecordBatch::try_new(self.schema.clone(), output)?))
     }
+
+    fn update_one_accumulator_with_native_value<T1>(
+        &mut self,
+        groups_with_rows: &[usize],
+        agg_input_array1: &T1,
+        acc_idx1: usize,
+        filter_bool_array: &[Option<&BooleanArray>],
+    ) -> Result<()>
+    where
+        T1: ArrowArrayReader,
+    {
+        let accumulator1 = &self.row_accumulators[acc_idx1];
+        let filter_array1 = &filter_bool_array[acc_idx1];
+        for group_idx in groups_with_rows {
+            let group_state = &mut self.aggr_state.group_states[*group_idx];
+            let mut state_accessor =
+                RowAccessor::new_from_layout(self.row_aggr_layout.clone());
+            state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice());
+            for idx in &group_state.indices {
+                let value = col_to_value(agg_input_array1, filter_array1, *idx as usize);
+                accumulator1.update_value::<T1::Item>(value, &mut state_accessor);
+            }
+            // clear the group indices in this group
+            group_state.indices.clear();
+        }
+
+        Ok(())
+    }
+
+    fn update_two_accumulator2_with_native_value<T1, T2>(

Review Comment:
   After thinking about this a bit -- I wonder if we can somehow get rid of the need to typecast during the aggregate at all -- it seems to me the code would be much simpler if the aggregator didn't have to cast its input, but instead the inputs were cast as needed. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6657: RowAccumulators support generics

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#discussion_r1230166803


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -761,4 +784,67 @@ impl GroupedHashAggregateStream {
         }
         Ok(Some(RecordBatch::try_new(self.schema.clone(), output)?))
     }
+
+    fn update_one_accumulator_with_native_value<T1>(
+        &mut self,
+        groups_with_rows: &[usize],
+        agg_input_array1: &T1,
+        acc_idx1: usize,
+        filter_bool_array: &[Option<&BooleanArray>],
+    ) -> Result<()>
+    where
+        T1: ArrowArrayReader,
+    {
+        let accumulator1 = &self.row_accumulators[acc_idx1];
+        let filter_array1 = &filter_bool_array[acc_idx1];
+        for group_idx in groups_with_rows {
+            let group_state = &mut self.aggr_state.group_states[*group_idx];
+            let mut state_accessor =
+                RowAccessor::new_from_layout(self.row_aggr_layout.clone());
+            state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice());
+            for idx in &group_state.indices {
+                let value = col_to_value(agg_input_array1, filter_array1, *idx as usize);
+                accumulator1.update_value::<T1::Item>(value, &mut state_accessor);
+            }
+            // clear the group indices in this group
+            group_state.indices.clear();
+        }
+
+        Ok(())
+    }
+
+    fn update_two_accumulator2_with_native_value<T1, T2>(

Review Comment:
   What is the reason to special case 2 accumulators?



##########
datafusion/physical-expr/src/aggregate/row_accumulator.rs:
##########
@@ -81,19 +95,321 @@ pub trait RowAccumulator: Send + Sync + Debug {
 
 /// Returns if `data_type` is supported with `RowAccumulator`
 pub fn is_row_accumulator_support_dtype(data_type: &DataType) -> bool {
-    matches!(
-        data_type,
-        DataType::Boolean
-            | DataType::UInt8
-            | DataType::UInt16
-            | DataType::UInt32
-            | DataType::UInt64
-            | DataType::Int8
-            | DataType::Int16
-            | DataType::Int32
-            | DataType::Int64
-            | DataType::Float32
-            | DataType::Float64
-            | DataType::Decimal128(_, _)
-    )
+    matches_all_supported_data_types!(data_type)
+}
+
+/// Enum to dispatch the RowAccumulator, RowAccumulator contains generic methods and can not be used as the trait objects
+pub enum RowAccumulatorItem {

Review Comment:
   If we are going to have an enum that has all the accumulators in it, I don't see much value in the `RowAccumulator` trait anymore  -- could we simply use `RowAccumulatorItem` everywhere?
   
   To keep using a trait I think you would need a function somewhere that took the input data types and instantiated an instance of `RowAccumulator` -- I may be missing some subtlety here as well.



##########
datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs:
##########
@@ -718,21 +828,84 @@ impl RowAccumulator for BitXorRowAccumulator {
         Ok(())
     }
 
-    fn update_scalar_values(
-        &mut self,
-        values: &[ScalarValue],
+    fn update_single_row(

Review Comment:
   As I mentioned above, I think the code would be simpler and likely faster if there wasn't a special single value case
   
   I seems like given that you have vectorized the code for updating the aggregator values, and the group operators already have a slice if `[usize]` (aka the selection vector) of values to aggregate, maybe all callsites can use the selection vector



##########
datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs:
##########
@@ -649,36 +652,59 @@ impl BoundedAggregateStream {
             })
             .collect::<Result<Vec<_>>>()?;
 
-        for group_idx in groups_with_rows {
-            let group_state =
-                &mut self.aggr_state.ordered_group_states[*group_idx].group_state;
-            let mut state_accessor =
-                RowAccessor::new_from_layout(self.row_aggr_layout.clone());
-            state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice());
-            for idx in &group_state.indices {
-                for (accumulator, values_array, filter_array) in izip!(
-                    self.row_accumulators.iter_mut(),
-                    row_values.iter(),
-                    filter_bool_array.iter()
-                ) {
-                    if values_array.len() == 1 {
-                        let scalar_value =
-                            col_to_scalar(&values_array[0], filter_array, *idx as usize)?;
-                        accumulator.update_scalar(&scalar_value, &mut state_accessor)?;
-                    } else {
-                        let scalar_values = values_array
-                            .iter()
-                            .map(|array| {
-                                col_to_scalar(array, filter_array, *idx as usize)
-                            })
-                            .collect::<Result<Vec<_>>>()?;
-                        accumulator
-                            .update_scalar_values(&scalar_values, &mut state_accessor)?;
+        let mut single_value_acc_idx = vec![];
+        let mut single_row_acc_idx = vec![];
+        self.row_accumulators
+            .iter()
+            .zip(row_values.iter())
+            .enumerate()
+            .for_each(|(idx, (acc, values))| {
+                if let RowAccumulatorItem::COUNT(_) = acc {
+                    single_row_acc_idx.push(idx);
+                } else if values.len() == 1 {
+                    single_value_acc_idx.push(idx);
+                } else {
+                    single_row_acc_idx.push(idx);
+                };
+            });
+
+        if single_value_acc_idx.len() == 1 && single_row_acc_idx.is_empty() {
+            let acc_idx1 = single_value_acc_idx[0];
+            let array1 = &row_values[acc_idx1][0];
+            let array1_dt = array1.data_type();
+            dispatch_all_supported_data_types! { impl_one_row_accumulator_dispatch, array1_dt, array1, acc_idx1, self, update_one_accumulator_with_native_value, groups_with_rows, filter_bool_array}
+        } else if single_value_acc_idx.len() == 2 && single_row_acc_idx.is_empty() {
+            let acc_idx1 = single_value_acc_idx[0];
+            let acc_idx2 = single_value_acc_idx[1];
+            let array1 = &row_values[acc_idx1][0];
+            let array2 = &row_values[acc_idx2][0];
+            let array1_dt = array1.data_type();
+            let array2_dt = array2.data_type();
+            dispatch_all_supported_data_types_pairs! { impl_two_row_accumulators_dispatch, array1_dt, array2_dt, array1, array2, acc_idx1, acc_idx2, self, update_two_accumulator2_with_native_value, groups_with_rows, filter_bool_array}
+        } else {
+            for group_idx in groups_with_rows {
+                let group_state =
+                    &mut self.aggr_state.ordered_group_states[*group_idx].group_state;
+                let mut state_accessor =
+                    RowAccessor::new_from_layout(self.row_aggr_layout.clone());
+                state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice());
+                for idx in &group_state.indices {
+                    for (accumulator, values_array, filter_array) in izip!(
+                        self.row_accumulators.iter_mut(),
+                        row_values.iter(),
+                        filter_bool_array.iter()
+                    ) {
+                        accumulator.update_single_row(
+                            values_array,
+                            filter_array,
+                            *idx as usize,
+                            &mut state_accessor,
+                        )?;
                     }

Review Comment:
   As written, doesn't this code do all the type dispatch for for each `idx` in this loop?
   
   Would it be possible to pass in `group_state.indices` just once so the dispatch is done just once?
   
   Something like this (would need the traits / etc updated)
   
   ```suggestion
                       for (accumulator, values_array, filter_array) in izip!(
                           self.row_accumulators.iter_mut(),
                           row_values.iter(),
                           filter_bool_array.iter()
                       ) {
                           accumulator.update_indicies(
                               values_array,
                               filter_array,
                               &group_state.indices,
                               &mut state_accessor,
                           )?;
                       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #6657: RowAccumulators support generics

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#discussion_r1241222042


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -761,4 +784,67 @@ impl GroupedHashAggregateStream {
         }
         Ok(Some(RecordBatch::try_new(self.schema.clone(), output)?))
     }
+
+    fn update_one_accumulator_with_native_value<T1>(
+        &mut self,
+        groups_with_rows: &[usize],
+        agg_input_array1: &T1,
+        acc_idx1: usize,
+        filter_bool_array: &[Option<&BooleanArray>],
+    ) -> Result<()>
+    where
+        T1: ArrowArrayReader,
+    {
+        let accumulator1 = &self.row_accumulators[acc_idx1];
+        let filter_array1 = &filter_bool_array[acc_idx1];
+        for group_idx in groups_with_rows {
+            let group_state = &mut self.aggr_state.group_states[*group_idx];
+            let mut state_accessor =
+                RowAccessor::new_from_layout(self.row_aggr_layout.clone());
+            state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice());
+            for idx in &group_state.indices {
+                let value = col_to_value(agg_input_array1, filter_array1, *idx as usize);
+                accumulator1.update_value::<T1::Item>(value, &mut state_accessor);
+            }
+            // clear the group indices in this group
+            group_state.indices.clear();
+        }
+
+        Ok(())
+    }
+
+    fn update_two_accumulator2_with_native_value<T1, T2>(

Review Comment:
   > After thinking about this a bit -- I wonder if we can somehow get rid of the need to typecast during the aggregate at all -- it seems to me the code would be much simpler if the aggregator didn't have to cast its input, but instead the inputs were cast as needed.
   
   I'm afraid the `typecast` can not be avoid.  This is because the arrow `Array` Trait itself does not provide any `read` functions to  read its value at a given `index`.  The trait must be downcast to the Struct(`PrimitiveArray<T:ArrowPrimitiveType>` or  `BooleanArray`) first.
   
   I think this is related to `Trait Objects` restrictions.  If we want to add `generate` methods(like read value at given index) method or add `GAT` to a trait, that trait can not be used as a `Trait Objects` any more.
   This is also why in this PR I had added a new trait. This new trait and its implementors bridge the type system between
   the arrow `ArrowPrimitiveType` and the row accumulator internal state types.
   
   ```rust
   pub trait ArrowArrayReader: Array {
       type Item: RowAccumulatorNativeType;
   
       /// Returns the element at index `i`
       /// # Panics
       /// Panics if the value is outside the bounds of the array
       fn value_at(&self, index: usize) -> Self::Item;
   
       /// Returns the element at index `i`
       /// # Safety
       /// Caller is responsible for ensuring that the index is within the bounds of the array
       unsafe fn value_at_unchecked(&self, index: usize) -> Self::Item;
   }
   
   impl<'a> ArrowArrayReader for &'a BooleanArray {
       type Item = bool;
   
       #[inline]
       fn value_at(&self, index: usize) -> Self::Item {
           BooleanArray::value(self, index)
       }
   
       #[inline]
       unsafe fn value_at_unchecked(&self, index: usize) -> Self::Item {
           BooleanArray::value_unchecked(self, index)
       }
   }
   
   impl<'a, T: ArrowPrimitiveType> ArrowArrayReader for &'a PrimitiveArray<T>
   where
       <T as ArrowPrimitiveType>::Native: RowAccumulatorNativeType,
   {
       type Item = T::Native;
   
       #[inline]
       fn value_at(&self, index: usize) -> Self::Item {
           PrimitiveArray::value(self, index)
       }
   
       #[inline]
       unsafe fn value_at_unchecked(&self, index: usize) -> Self::Item {
           PrimitiveArray::value_unchecked(self, index)
       }
   }
   
   ```
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6657: RowAccumulators support generics

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#issuecomment-1611906212

   Thank you -- I hope to POC out the approach in https://github.com/apache/arrow-datafusion/issues/4973#issuecomment-1608068287 over the next day or so (sorry I have been busy with other things -- notably creating a reproducer for https://github.com/apache/arrow-rs/issues/4459


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #6657: RowAccumulators support generics

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#discussion_r1241817848


##########
datafusion/physical-expr/src/aggregate/row_agg_macros.rs:
##########
@@ -0,0 +1,525 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[macro_export]
+macro_rules! matches_all_supported_data_types {
+    ($expression:expr) => {
+        matches!(
+            $expression,
+            DataType::Boolean
+                | DataType::UInt8
+                | DataType::UInt16
+                | DataType::UInt32
+                | DataType::UInt64
+                | DataType::Int8
+                | DataType::Int16
+                | DataType::Int32
+                | DataType::Int64
+                | DataType::Float32
+                | DataType::Float64
+                | DataType::Decimal128(_, _)
+        )
+    };
+}
+pub use matches_all_supported_data_types;
+
+#[macro_export]
+macro_rules! dispatch_all_supported_data_types {
+    ($macro:ident $(, $x:ident)*) => {
+        $macro! {
+                [$($x),*],
+                { Boolean, BooleanArray },
+                { Int8, Int8Array },
+                { Int16, Int16Array },
+                { Int32, Int32Array },
+                { Int64, Int64Array },
+                { UInt8, UInt8Array },
+                { UInt16, UInt16Array },
+                { UInt32, UInt32Array },
+                { UInt64, UInt64Array },
+                { Float32, Float32Array },
+                { Float64, Float64Array },
+                { Decimal128, Decimal128Array }
+        }
+    };
+}
+
+pub use dispatch_all_supported_data_types;
+
+// TODO generate the matching type pairs
+#[macro_export]

Review Comment:
   > I really worry about this table -- it seems like it will be very hard to maintain (aka trying to add some new type or accumulator type will be very hard)
   
   Done, I use nested macros to generate the body now. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #6657: RowAccumulators support generics

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#discussion_r1241225303


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -761,4 +784,67 @@ impl GroupedHashAggregateStream {
         }
         Ok(Some(RecordBatch::try_new(self.schema.clone(), output)?))
     }
+
+    fn update_one_accumulator_with_native_value<T1>(
+        &mut self,
+        groups_with_rows: &[usize],
+        agg_input_array1: &T1,
+        acc_idx1: usize,
+        filter_bool_array: &[Option<&BooleanArray>],
+    ) -> Result<()>
+    where
+        T1: ArrowArrayReader,
+    {
+        let accumulator1 = &self.row_accumulators[acc_idx1];
+        let filter_array1 = &filter_bool_array[acc_idx1];
+        for group_idx in groups_with_rows {
+            let group_state = &mut self.aggr_state.group_states[*group_idx];
+            let mut state_accessor =
+                RowAccessor::new_from_layout(self.row_aggr_layout.clone());
+            state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice());
+            for idx in &group_state.indices {
+                let value = col_to_value(agg_input_array1, filter_array1, *idx as usize);
+                accumulator1.update_value::<T1::Item>(value, &mut state_accessor);
+            }
+            // clear the group indices in this group
+            group_state.indices.clear();
+        }
+
+        Ok(())
+    }
+
+    fn update_two_accumulator2_with_native_value<T1, T2>(

Review Comment:
   Actually in. the current arrow-rs compute kernels, the `typecast` is everywhere
   
   https://github.com/apache/arrow-rs/blob/b163b19d213c57170789f32a2011cbadf9ab4120/arrow-ord/src/comparison.rs#L1598-L1748
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #6657: RowAccumulators support generics

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#discussion_r1241217452


##########
datafusion/physical-expr/src/aggregate/row_agg_macros.rs:
##########
@@ -0,0 +1,525 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[macro_export]
+macro_rules! matches_all_supported_data_types {
+    ($expression:expr) => {
+        matches!(
+            $expression,
+            DataType::Boolean
+                | DataType::UInt8
+                | DataType::UInt16
+                | DataType::UInt32
+                | DataType::UInt64
+                | DataType::Int8
+                | DataType::Int16
+                | DataType::Int32
+                | DataType::Int64
+                | DataType::Float32
+                | DataType::Float64
+                | DataType::Decimal128(_, _)
+        )
+    };
+}
+pub use matches_all_supported_data_types;
+
+#[macro_export]
+macro_rules! dispatch_all_supported_data_types {
+    ($macro:ident $(, $x:ident)*) => {
+        $macro! {
+                [$($x),*],
+                { Boolean, BooleanArray },
+                { Int8, Int8Array },
+                { Int16, Int16Array },
+                { Int32, Int32Array },
+                { Int64, Int64Array },
+                { UInt8, UInt8Array },
+                { UInt16, UInt16Array },
+                { UInt32, UInt32Array },
+                { UInt64, UInt64Array },
+                { Float32, Float32Array },
+                { Float64, Float64Array },
+                { Decimal128, Decimal128Array }
+        }
+    };
+}
+
+pub use dispatch_all_supported_data_types;
+
+// TODO generate the matching type pairs
+#[macro_export]

Review Comment:
   I had tried to move all the row accumulator supported types and the dispatching logic to this new added macros.
   I know this macro `dispatch_all_supported_data_types_pairs` is not well implemented, I will try to use another macro to generate the body.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6657: RowAccumulators support generics

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#issuecomment-1601265669

   > Yes, please help me to simply this PR.
    
   I am hoping to find time tomorrow to do so 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6657: RowAccumulators support generics

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#issuecomment-1605455567

   tomorrow....


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #6657: RowAccumulators support generics

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#issuecomment-1610787592

   @alamb @tustvold 
   Move this PR to draft.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #6657: RowAccumulators support generics

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#issuecomment-1590420503

   @liurenjie1024 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #6657: RowAccumulators support generics

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#issuecomment-1590417262

   ![Uploading flamegraph.svg…]()
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #6657: RowAccumulators support generics

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#issuecomment-1607316427

   BTW, I had also tested the `enum_dispatch` macros, this can reduce about `300` lines of code in this PR,  but seems the performance is not as good as the handwriting version, so I give it up and keep the current handwriting version.
   
   ```rust
   #[enum_dispatch]
   pub trait RowAccumulator: Send + Sync + Debug {
       /// updates the accumulator's state from a vector of arrays.
       fn update_batch(&self, values: &[ArrayRef], accessor: &mut RowAccessor)
           -> Result<()>;
   
       /// updates the accumulator's state from rows with the specified indices.
       fn update_row_indices(
           &self,
           values: &[ArrayRef],
           filter: &Option<&BooleanArray>,
           row_indices: &[usize],
           accessor: &mut RowAccessor,
       ) -> Result<()>;
   
       /// updates the accumulator's state from a rust native value.
       fn update_value<N: RowAccumulatorNativeType>(
           &self,
           native_value: Option<N>,
           accessor: &mut RowAccessor,
       );
   
       /// updates the accumulator's state from a vector of states.
       fn merge_batch(
           &mut self,
           states: &[ArrayRef],
           accessor: &mut RowAccessor,
       ) -> Result<()>;
   
       /// returns its value based on its current state.
       fn evaluate(&self, accessor: &RowAccessor) -> Result<ScalarValue>;
   
       /// State's starting field index in the row.
       fn state_index(&self) -> usize;
   }
   
   /// Returns if `data_type` is supported with `RowAccumulator`
   pub fn is_row_accumulator_support_dtype(data_type: &DataType) -> bool {
       matches_all_supported_data_types!(data_type)
   }
   
   /// Enum to dispatch the RowAccumulator, RowAccumulator contains generic methods and can not be used as the trait objects
   #[derive(Debug)]
   #[enum_dispatch(RowAccumulator)]
   pub enum RowAccumulatorItem {
       AVG(AvgRowAccumulator),
       SUM(SumRowAccumulator),
       COUNT(CountRowAccumulator),
       MIN(MinRowAccumulator),
       MAX(MaxRowAccumulator),
       BITAND(BitAndRowAccumulator),
       BITOR(BitOrRowAccumulator),
       BITXOR(BitXorRowAccumulator),
       BOOLAND(BoolAndRowAccumulator),
       BOOLOR(BoolOrRowAccumulator),
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #6657: RowAccumulators support generics

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#issuecomment-1623051719

   Just close it since we will have a better approach.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6657: RowAccumulators support generics

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#issuecomment-1616610850

   BTW the POC is https://github.com/apache/arrow-datafusion/pull/6800 and it is showing some very nice results so far


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #6657: RowAccumulators support generics

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#discussion_r1241215805


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -761,4 +785,67 @@ impl GroupedHashAggregateStream {
         }
         Ok(Some(RecordBatch::try_new(self.schema.clone(), output)?))
     }
+
+    fn update_one_accumulator_with_native_value<T1>(

Review Comment:
   Yes, there are duplicates between those two.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6657: RowAccumulators support generics

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#discussion_r1241289085


##########
datafusion/physical-expr/src/aggregate/row_agg_macros.rs:
##########
@@ -0,0 +1,525 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[macro_export]
+macro_rules! matches_all_supported_data_types {
+    ($expression:expr) => {
+        matches!(
+            $expression,
+            DataType::Boolean
+                | DataType::UInt8
+                | DataType::UInt16
+                | DataType::UInt32
+                | DataType::UInt64
+                | DataType::Int8
+                | DataType::Int16
+                | DataType::Int32
+                | DataType::Int64
+                | DataType::Float32
+                | DataType::Float64
+                | DataType::Decimal128(_, _)
+        )
+    };
+}
+pub use matches_all_supported_data_types;
+
+#[macro_export]
+macro_rules! dispatch_all_supported_data_types {
+    ($macro:ident $(, $x:ident)*) => {
+        $macro! {
+                [$($x),*],
+                { Boolean, BooleanArray },
+                { Int8, Int8Array },
+                { Int16, Int16Array },
+                { Int32, Int32Array },
+                { Int64, Int64Array },
+                { UInt8, UInt8Array },
+                { UInt16, UInt16Array },
+                { UInt32, UInt32Array },
+                { UInt64, UInt64Array },
+                { Float32, Float32Array },
+                { Float64, Float64Array },
+                { Decimal128, Decimal128Array }
+        }
+    };
+}
+
+pub use dispatch_all_supported_data_types;
+
+// TODO generate the matching type pairs
+#[macro_export]

Review Comment:
   I think the challenge in my mind is that it seems so specific -- I think maybe centralizing the instantiation of type specific accumulator code would help a lot (aka move the type dispatch logic into something that looks like an arrow kernel... I will thin about this as I sleep tonight 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on a diff in pull request #6657: RowAccumulators support generics

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on code in PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#discussion_r1241295359


##########
datafusion/physical-expr/src/aggregate/row_agg_macros.rs:
##########
@@ -0,0 +1,525 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[macro_export]
+macro_rules! matches_all_supported_data_types {
+    ($expression:expr) => {
+        matches!(
+            $expression,
+            DataType::Boolean
+                | DataType::UInt8
+                | DataType::UInt16
+                | DataType::UInt32
+                | DataType::UInt64
+                | DataType::Int8
+                | DataType::Int16
+                | DataType::Int32
+                | DataType::Int64
+                | DataType::Float32
+                | DataType::Float64
+                | DataType::Decimal128(_, _)
+        )
+    };
+}
+pub use matches_all_supported_data_types;
+
+#[macro_export]
+macro_rules! dispatch_all_supported_data_types {
+    ($macro:ident $(, $x:ident)*) => {
+        $macro! {
+                [$($x),*],
+                { Boolean, BooleanArray },
+                { Int8, Int8Array },
+                { Int16, Int16Array },
+                { Int32, Int32Array },
+                { Int64, Int64Array },
+                { UInt8, UInt8Array },
+                { UInt16, UInt16Array },
+                { UInt32, UInt32Array },
+                { UInt64, UInt64Array },
+                { Float32, Float32Array },
+                { Float64, Float64Array },
+                { Decimal128, Decimal128Array }
+        }
+    };
+}
+
+pub use dispatch_all_supported_data_types;
+
+// TODO generate the matching type pairs
+#[macro_export]

Review Comment:
   > I will think about this as I sleep tonight
   
   This made me smile -- such is the life of an engineer 🤣 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6657: RowAccumulators support generics

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#issuecomment-1604453454

   I have not forgotten about this -- I hope/plan to spend time on the weekend 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #6657: RowAccumulators support generics

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#discussion_r1241226010


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -496,37 +497,57 @@ impl GroupedHashAggregateStream {
             })
             .collect::<Result<Vec<_>>>()?;
 
-        for group_idx in groups_with_rows {
-            let group_state = &mut self.aggr_state.group_states[*group_idx];
-            let mut state_accessor =
-                RowAccessor::new_from_layout(self.row_aggr_layout.clone());
-            state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice());
-            for idx in &group_state.indices {
+        let mut single_value_acc_idx = vec![];

Review Comment:
   We definitely should address the duplicate code between the `row_hash.rs` and `bounded_aggregate_stream.rs`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #6657: RowAccumulators support generics

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#discussion_r1231742904


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -761,4 +784,67 @@ impl GroupedHashAggregateStream {
         }
         Ok(Some(RecordBatch::try_new(self.schema.clone(), output)?))
     }
+
+    fn update_one_accumulator_with_native_value<T1>(
+        &mut self,
+        groups_with_rows: &[usize],
+        agg_input_array1: &T1,
+        acc_idx1: usize,
+        filter_bool_array: &[Option<&BooleanArray>],
+    ) -> Result<()>
+    where
+        T1: ArrowArrayReader,
+    {
+        let accumulator1 = &self.row_accumulators[acc_idx1];
+        let filter_array1 = &filter_bool_array[acc_idx1];
+        for group_idx in groups_with_rows {
+            let group_state = &mut self.aggr_state.group_states[*group_idx];
+            let mut state_accessor =
+                RowAccessor::new_from_layout(self.row_aggr_layout.clone());
+            state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice());
+            for idx in &group_state.indices {
+                let value = col_to_value(agg_input_array1, filter_array1, *idx as usize);
+                accumulator1.update_value::<T1::Item>(value, &mut state_accessor);
+            }
+            // clear the group indices in this group
+            group_state.indices.clear();
+        }
+
+        Ok(())
+    }
+
+    fn update_two_accumulator2_with_native_value<T1, T2>(

Review Comment:
   Yes, I think we have to handle the 2 accumulators cases specially,  to avoid the dynamic type dispatching in the inner loops and leverage generics to do the static type dispatching,  and also to achieve the best performance.
   
   The current update flow is:
   
   ```
   outmost loop :  for each `group idx` in groups_with_rows 
        construct the RowAccessor
        inner loop:s  for each accumuator
            update accumulator stats
   ```
   The accumuators loop **must be** in the inner loop, so that we can benefit from the row layout and 
   different accumulator stats are updated to the single row(the same `RowAccessor`).
   
   And what we want to achieve is to do the array type cast out side the outmost loop
   
   ```
   for each agg input arrays,  type cast here
   outmost loop :  for each `group idx` in groups_with_rows
        construct the RowAccessor
        inner loop:s  for each accumuator
             call  generic method to update the accumuator stats
   ```
   
   Because the agg input types for each accumulator is different, the generic method can not be called in the inner loops, so that I have to handle the 2 accumulators cases specially.   So the update flow becomes
   
   1 Accumuator version:
   
   ```
   agg input array1,  type cast to T1
   outmost loop :  for each `group idx` in groups_with_rows 
        construct the RowAccessor
        generic method <T1>:  update accumuator stats
   ```
   
   2 Accumuators version:
   
   ```
   agg input array1,  type cast to T1
   agg input array2,  type cast to T2
   outmost loop :  for each `group idx` in groups_with_rows
        construct the RowAccessor
        generic method <T1>:  update accumuator1 stats
        generic method <T2>:  update accumuator2 stats
   ```
   
   3 Accumuators version:
   
   ```
   agg input array1,  type cast to T1
   agg input array2,  type cast to T2
   agg input array3,  type cast to T3
   outmost loop :  for each `group idx` in groups_with_rows
        construct the RowAccessor
        generic method <T1>:  update accumuator1 stats
        generic method <T2>:  update accumuator2 stats
        generic method <T3>:  update accumuator3 stats
   
   ```
   
   Hope this explains. I'm not sure whether there are other good ways to simply the code.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang closed pull request #6657: RowAccumulators support generics

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang closed pull request #6657: RowAccumulators support generics
URL: https://github.com/apache/arrow-datafusion/pull/6657


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org