You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/08/01 12:56:25 UTC

[GitHub] [arrow-datafusion] alamb opened a new pull request #808: (WIP) Rework GroupByHash to for faster performance and support grouping by nulls

alamb opened a new pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808


   NOTE: this PR is WIP -- still todo:
   - [ ] Debugging
   - [ ] Measure performance
   - [ ] Handle hash collisions
   
   # Which issue does this PR close? 
   
   Closes https://github.com/apache/arrow-datafusion/issues/790 by implementing a new design for group by hash
   
   # Note
   built on https://github.com/apache/arrow-datafusion/pull/793
   
   
   
   
    # Rationale for this change
   1. Regain performance lost when we added support for GROUP BY NULL; See https://github.com/apache/arrow-datafusion/issues/790 for more details
   
   # What changes are included in this PR?
   1. Use a hash to to create the appropriate grouping, use indexes rather than hash keys many time
   
   # Performance
   
   ## Measurements
   In progress
   
   ## Notes
   This approach avoids the following operations which should improve its speed:
   1. Avoids copying GroupValues into a Vec to hash, saving both time and space
   2. Avoids several hash table lookups (used indexes into `group_values` instead
   
   # Are there any user-facing changes?
   Faster performance
   
   
   # Notes:
   I tried to keep the same names and structure of the existing hash algorithm (as I found that easy to follow -- nice work @Dandandan  and @andygrove ) and I think that will make this easier to review
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan edited a comment on pull request #808: Rework GroupByHash to for faster performance and support grouping by nulls

Posted by GitBox <gi...@apache.org>.
Dandandan edited a comment on pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808#issuecomment-897147077


   A TCP-H query that got quite a bit faster is q13, on parquet SF=100 from 37.8s -> 29.5s


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #808: (WIP) Rework GroupByHash to for faster performance and support grouping by nulls

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808#discussion_r684639456



##########
File path: datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -363,55 +348,74 @@ fn group_aggregate_batch(
 
     let mut group_by_values = group_by_values.into_boxed_slice();
 
-    let mut key = Vec::with_capacity(group_values.len());
-
     // 1.1 construct the key from the group values
     // 1.2 construct the mapping key if it does not exist
     // 1.3 add the row' index to `indices`
 
     // Make sure we can create the accumulators or otherwise return an error
     create_accumulators(aggr_expr).map_err(DataFusionError::into_arrow_external_error)?;
 
-    // Keys received in this batch
-    let mut batch_keys = vec![];
+    // track which entries in `accumulators` have rows in this batch to aggregate
+    let mut groups_with_rows = vec![];
 
-    for row in 0..batch.num_rows() {
-        // 1.1
-        create_key(&group_values, row, &mut key)
-            .map_err(DataFusionError::into_arrow_external_error)?;
+    // 1.1 Calculate the group keys for the group values
+    let mut batch_hashes = vec![0; batch.num_rows()];
+    create_hashes(&group_values, random_state, &mut batch_hashes)?;
 
-        accumulators
-            .raw_entry_mut()
-            .from_key(&key)
-            // 1.3
-            .and_modify(|_, (_, _, v)| {
-                if v.is_empty() {
-                    batch_keys.push(key.clone())
+    for (row, hash) in batch_hashes.into_iter().enumerate() {
+        let Accumulators { map, group_states } = &mut accumulators;
+
+        let entry = map.get_mut(hash, |(_hash, group_idx)| {
+            // verify that a group that we are inserting with hash is
+            // actually the same key value as the group in
+            // existing_idx  (aka group_values @ row)
+            let group_state = &group_states[*group_idx];
+            group_values
+                .iter()
+                .zip(group_state.group_by_values.iter())
+                .all(|(array, scalar)| scalar.eq_array(array, row))
+        });
+
+        match entry {
+            // Existing entry for this group value
+            Some((_hash, group_idx)) => {
+                let group_state = &mut group_states[*group_idx];
+                // 1.3
+                if group_state.indices.is_empty() {
+                    groups_with_rows.push(*group_idx);
                 };
-                v.push(row as u32)
-            })
-            // 1.2
-            .or_insert_with(|| {
+                group_state.indices.push(row as u32); // remember this row
+            }
+            //  1.2 Need to create new entry
+            None => {
                 // We can safely unwrap here as we checked we can create an accumulator before
                 let accumulator_set = create_accumulators(aggr_expr).unwrap();
-                batch_keys.push(key.clone());

Review comment:
       `create_accumulators` can use `?` now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #808: (WIP) Rework GroupByHash to for faster performance and support grouping by nulls

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808#discussion_r683777504



##########
File path: datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -363,55 +348,74 @@ fn group_aggregate_batch(
 
     let mut group_by_values = group_by_values.into_boxed_slice();
 
-    let mut key = Vec::with_capacity(group_values.len());
-
     // 1.1 construct the key from the group values
     // 1.2 construct the mapping key if it does not exist
     // 1.3 add the row' index to `indices`
 
     // Make sure we can create the accumulators or otherwise return an error
     create_accumulators(aggr_expr).map_err(DataFusionError::into_arrow_external_error)?;
 
-    // Keys received in this batch
-    let mut batch_keys = vec![];
+    // track which entries in `accumulators` have rows in this batch to aggregate
+    let mut groups_with_rows = vec![];
 
-    for row in 0..batch.num_rows() {
-        // 1.1
-        create_key(&group_values, row, &mut key)
-            .map_err(DataFusionError::into_arrow_external_error)?;
+    // 1.1 Calculate the group keys for the group values
+    let mut batch_hashes = vec![0; batch.num_rows()];
+    create_hashes(&group_values, random_state, &mut batch_hashes)?;
 
-        accumulators
-            .raw_entry_mut()
-            .from_key(&key)
-            // 1.3
-            .and_modify(|_, (_, _, v)| {
-                if v.is_empty() {
-                    batch_keys.push(key.clone())
+    for (row, hash) in batch_hashes.into_iter().enumerate() {
+        let Accumulators { map, group_states } = &mut accumulators;
+
+        let entry = map.get_mut(hash, |(_hash, group_idx)| {
+            // verify that a group that we are inserting with hash is

Review comment:
       added




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #808: Rework GroupByHash to for faster performance and support grouping by nulls

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808#discussion_r685935170



##########
File path: datafusion/src/scalar.rs
##########
@@ -1654,6 +1776,159 @@ mod tests {
         assert_eq!(std::mem::size_of::<ScalarValue>(), 32);
     }
 
+    #[test]
+    fn scalar_eq_array() {
+        // Validate that eq_array has the same semantics as ScalarValue::eq
+        macro_rules! make_typed_vec {
+            ($INPUT:expr, $TYPE:ident) => {{
+                $INPUT
+                    .iter()
+                    .map(|v| v.map(|v| v as $TYPE))
+                    .collect::<Vec<_>>()
+            }};
+        }
+
+        let bool_vals = vec![Some(true), None, Some(false)];
+        let f32_vals = vec![Some(-1.0), None, Some(1.0)];
+        let f64_vals = make_typed_vec!(f32_vals, f64);
+
+        let i8_vals = vec![Some(-1), None, Some(1)];
+        let i16_vals = make_typed_vec!(i8_vals, i16);
+        let i32_vals = make_typed_vec!(i8_vals, i32);
+        let i64_vals = make_typed_vec!(i8_vals, i64);
+
+        let u8_vals = vec![Some(0), None, Some(1)];
+        let u16_vals = make_typed_vec!(u8_vals, u16);
+        let u32_vals = make_typed_vec!(u8_vals, u32);
+        let u64_vals = make_typed_vec!(u8_vals, u64);
+
+        let str_vals = vec![Some("foo"), None, Some("bar")];

Review comment:
       The `NULL` is present to test null handling (which found a bug in my dictionary implementation, actually)
   
   It is always the second entry because:
   1. I basically copy/pasted the tests
   2. I figured putting the validity bit in the middle (rather than at the ends) would be more likely to catch potential latent bugs (though your suggestion of varying its location is probably better). In theory all the null edge cases should be handled in the underlying arrow code




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #808: (WIP) Rework GroupByHash to for faster performance and support grouping by nulls

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808#discussion_r683752478



##########
File path: datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -363,55 +348,74 @@ fn group_aggregate_batch(
 
     let mut group_by_values = group_by_values.into_boxed_slice();
 
-    let mut key = Vec::with_capacity(group_values.len());
-
     // 1.1 construct the key from the group values
     // 1.2 construct the mapping key if it does not exist
     // 1.3 add the row' index to `indices`
 
     // Make sure we can create the accumulators or otherwise return an error
     create_accumulators(aggr_expr).map_err(DataFusionError::into_arrow_external_error)?;
 
-    // Keys received in this batch
-    let mut batch_keys = vec![];
+    // track which entries in `accumulators` have rows in this batch to aggregate
+    let mut groups_with_rows = vec![];
 
-    for row in 0..batch.num_rows() {
-        // 1.1
-        create_key(&group_values, row, &mut key)
-            .map_err(DataFusionError::into_arrow_external_error)?;
+    // 1.1 Calculate the group keys for the group values
+    let mut batch_hashes = vec![0; batch.num_rows()];
+    create_hashes(&group_values, random_state, &mut batch_hashes)?;
 
-        accumulators
-            .raw_entry_mut()
-            .from_key(&key)
-            // 1.3
-            .and_modify(|_, (_, _, v)| {
-                if v.is_empty() {
-                    batch_keys.push(key.clone())
+    for (row, hash) in batch_hashes.into_iter().enumerate() {
+        let Accumulators { map, group_states } = &mut accumulators;
+
+        let entry = map.get_mut(hash, |(_hash, group_idx)| {
+            // verify that a group that we are inserting with hash is

Review comment:
       I wonder whether it might be faster to also check for the hashes being equal. `get_mut` returns a potential match based on the values being in the same bucket (which might have a different hash).
   Checking that first before checking the actual values might save some time if there is a higher percentage of collisions, and the check itself is relatively slow.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on pull request #808: Rework GroupByHash to for faster performance and support grouping by nulls

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808#issuecomment-897147077


   A TCP-H query that also get's quite a bit faster is q13, on Parquet 37.8s -> 29.5s


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #808: (WIP) Rework GroupByHash to for faster performance and support grouping by nulls

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808#discussion_r685371811



##########
File path: datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -363,55 +348,74 @@ fn group_aggregate_batch(
 
     let mut group_by_values = group_by_values.into_boxed_slice();
 
-    let mut key = Vec::with_capacity(group_values.len());
-
     // 1.1 construct the key from the group values
     // 1.2 construct the mapping key if it does not exist
     // 1.3 add the row' index to `indices`
 
     // Make sure we can create the accumulators or otherwise return an error
     create_accumulators(aggr_expr).map_err(DataFusionError::into_arrow_external_error)?;
 
-    // Keys received in this batch
-    let mut batch_keys = vec![];
+    // track which entries in `accumulators` have rows in this batch to aggregate
+    let mut groups_with_rows = vec![];
 
-    for row in 0..batch.num_rows() {
-        // 1.1
-        create_key(&group_values, row, &mut key)
-            .map_err(DataFusionError::into_arrow_external_error)?;
+    // 1.1 Calculate the group keys for the group values
+    let mut batch_hashes = vec![0; batch.num_rows()];
+    create_hashes(&group_values, random_state, &mut batch_hashes)?;
 
-        accumulators
-            .raw_entry_mut()
-            .from_key(&key)
-            // 1.3
-            .and_modify(|_, (_, _, v)| {
-                if v.is_empty() {
-                    batch_keys.push(key.clone())
+    for (row, hash) in batch_hashes.into_iter().enumerate() {
+        let Accumulators { map, group_states } = &mut accumulators;
+
+        let entry = map.get_mut(hash, |(_hash, group_idx)| {
+            // verify that a group that we are inserting with hash is
+            // actually the same key value as the group in
+            // existing_idx  (aka group_values @ row)
+            let group_state = &group_states[*group_idx];
+            group_values
+                .iter()
+                .zip(group_state.group_by_values.iter())
+                .all(|(array, scalar)| scalar.eq_array(array, row))
+        });
+
+        match entry {
+            // Existing entry for this group value
+            Some((_hash, group_idx)) => {
+                let group_state = &mut group_states[*group_idx];
+                // 1.3
+                if group_state.indices.is_empty() {
+                    groups_with_rows.push(*group_idx);
                 };
-                v.push(row as u32)
-            })
-            // 1.2
-            .or_insert_with(|| {
+                group_state.indices.push(row as u32); // remember this row
+            }
+            //  1.2 Need to create new entry
+            None => {
                 // We can safely unwrap here as we checked we can create an accumulator before
                 let accumulator_set = create_accumulators(aggr_expr).unwrap();
-                batch_keys.push(key.clone());
-                // Note it would be nice to make this a real error (rather than panic)
-                // but it is better than silently ignoring the issue and getting wrong results
-                create_group_by_values(&group_values, row, &mut group_by_values)
-                    .expect("can not create group by value");
-                (
-                    key.clone(),
-                    (group_by_values.clone(), accumulator_set, vec![row as u32]),
-                )
-            });
+
+                // Copy group values from arrays into ScalarValues
+                create_group_by_values(&group_values, row, &mut group_by_values)?;

Review comment:
       in 0051a85c8




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #808: (WIP) Rework GroupByHash to for faster performance and support grouping by nulls

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808#issuecomment-895409906


   I am basically done with this PR. All that remains in my mind is to run some benchmarks and I'll mark it as ready for review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on pull request #808: Rework GroupByHash to for faster performance and support grouping by nulls

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808#issuecomment-897878325


   Thanks @alamb ! :tada: :tada: :tada: :tada: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan edited a comment on pull request #808: (WIP) Rework GroupByHash to for faster performance and support grouping by nulls

Posted by GitBox <gi...@apache.org>.
Dandandan edited a comment on pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808#issuecomment-895425959


   On the db-benchmark aggregation queries:
   
   PR:
   ```
   q1 took 33 ms
   q2 took 377 ms
   q3 took 986 ms
   q4 took 47 ms
   q5 took 973 ms
   q7 took 932 ms
   q10 took 4040 ms
   ```
   
   Master:
   ```
   q1 took 37 ms
   q2 took 325 ms
   q3 took 1431 ms
   q4 took 56 ms
   q5 took 1287 ms
   q7 took 1304 ms
   q10 took 9380 ms
   ```
   
   It looks like it's a small perf hit on q2,  but I think the other 4 queries do greatly compensate for this :tada: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #808: Rework GroupByHash to for faster performance and support grouping by nulls

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808#issuecomment-896630464


   Rebased now that https://github.com/apache/arrow-datafusion/pull/812 has been merged


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #808: (WIP) Rework GroupByHash to for faster performance and support grouping by nulls

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808#discussion_r683834491



##########
File path: datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -363,55 +348,74 @@ fn group_aggregate_batch(
 
     let mut group_by_values = group_by_values.into_boxed_slice();
 
-    let mut key = Vec::with_capacity(group_values.len());
-
     // 1.1 construct the key from the group values
     // 1.2 construct the mapping key if it does not exist
     // 1.3 add the row' index to `indices`
 
     // Make sure we can create the accumulators or otherwise return an error
     create_accumulators(aggr_expr).map_err(DataFusionError::into_arrow_external_error)?;
 
-    // Keys received in this batch
-    let mut batch_keys = vec![];
+    // track which entries in `accumulators` have rows in this batch to aggregate
+    let mut groups_with_rows = vec![];
 
-    for row in 0..batch.num_rows() {
-        // 1.1
-        create_key(&group_values, row, &mut key)
-            .map_err(DataFusionError::into_arrow_external_error)?;
+    // 1.1 Calculate the group keys for the group values
+    let mut batch_hashes = vec![0; batch.num_rows()];
+    create_hashes(&group_values, random_state, &mut batch_hashes)?;
 
-        accumulators
-            .raw_entry_mut()
-            .from_key(&key)
-            // 1.3
-            .and_modify(|_, (_, _, v)| {
-                if v.is_empty() {
-                    batch_keys.push(key.clone())
+    for (row, hash) in batch_hashes.into_iter().enumerate() {
+        let Accumulators { map, group_states } = &mut accumulators;
+
+        let entry = map.get_mut(hash, |(_hash, group_idx)| {
+            // verify that a group that we are inserting with hash is

Review comment:
       So I have annecdotal evidence that this check simply slows things down (as the equivalence function isn't called when the hashes are different). Why do you think it will be called when the hash values are different?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan edited a comment on pull request #808: Rework GroupByHash to for faster performance and support grouping by nulls

Posted by GitBox <gi...@apache.org>.
Dandandan edited a comment on pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808#issuecomment-897147077


   A TCP-H query that also get's quite a bit faster is q13, on parquet SF=100 -> 37.8s -> 29.5s


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan merged pull request #808: Rework GroupByHash to for faster performance and support grouping by nulls

Posted by GitBox <gi...@apache.org>.
Dandandan merged pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] NGA-TRAN commented on a change in pull request #808: Rework GroupByHash to for faster performance and support grouping by nulls

Posted by GitBox <gi...@apache.org>.
NGA-TRAN commented on a change in pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808#discussion_r685543182



##########
File path: datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -779,8 +553,47 @@ impl GroupedHashAggregateStream {
 }
 
 type AccumulatorItem = Box<dyn Accumulator>;
-type Accumulators =
-    HashMap<Vec<u8>, (Box<[ScalarValue]>, Vec<AccumulatorItem>, Vec<u32>), RandomState>;
+
+/// The state that is built for each output group.
+#[derive(Debug)]
+struct GroupState {
+    /// The actual group by values, one for each group column
+    group_by_values: Box<[ScalarValue]>,
+
+    // Accumulator state, one for each aggregate
+    accumulator_set: Vec<AccumulatorItem>,
+
+    /// scratch space used to collect indices for input rows in a
+    /// bach that have values to aggregate. Reset on each batch
+    indices: Vec<u32>,
+}

Review comment:
       Nice

##########
File path: datafusion/src/scalar.rs
##########
@@ -973,22 +1011,106 @@ impl ScalarValue {
         })
     }
 
-    fn try_from_dict_array<K: ArrowDictionaryKeyType>(
+    /// Compares array @ index for equality with self, in an optimized fashion
+    ///
+    /// This method implements an optimized version of:
+    ///
+    /// ``text
+    ///     let arr_scalar = Self::try_from_array(array, index).unwrap();
+    ///     arr_scalar.eq(self)
+    /// ```
+    #[inline]
+    pub fn eq_array(&self, array: &ArrayRef, index: usize) -> bool {
+        if let DataType::Dictionary(key_type, _) = array.data_type() {
+            return self.eq_array_dictionary(array, index, key_type);
+        }
+
+        match self {
+            ScalarValue::Boolean(val) => {
+                eq_array_primitive!(array, index, BooleanArray, val)
+            }
+            ScalarValue::Float32(val) => {
+                eq_array_primitive!(array, index, Float32Array, val)
+            }
+            ScalarValue::Float64(val) => {
+                eq_array_primitive!(array, index, Float64Array, val)
+            }
+            ScalarValue::Int8(val) => eq_array_primitive!(array, index, Int8Array, val),
+            ScalarValue::Int16(val) => eq_array_primitive!(array, index, Int16Array, val),
+            ScalarValue::Int32(val) => eq_array_primitive!(array, index, Int32Array, val),
+            ScalarValue::Int64(val) => eq_array_primitive!(array, index, Int64Array, val),
+            ScalarValue::UInt8(val) => eq_array_primitive!(array, index, UInt8Array, val),
+            ScalarValue::UInt16(val) => {
+                eq_array_primitive!(array, index, UInt16Array, val)
+            }
+            ScalarValue::UInt32(val) => {
+                eq_array_primitive!(array, index, UInt32Array, val)
+            }
+            ScalarValue::UInt64(val) => {
+                eq_array_primitive!(array, index, UInt64Array, val)
+            }
+            ScalarValue::Utf8(val) => eq_array_primitive!(array, index, StringArray, val),
+            ScalarValue::LargeUtf8(val) => {
+                eq_array_primitive!(array, index, LargeStringArray, val)
+            }
+            ScalarValue::Binary(val) => {
+                eq_array_primitive!(array, index, BinaryArray, val)
+            }
+            ScalarValue::LargeBinary(val) => {
+                eq_array_primitive!(array, index, LargeBinaryArray, val)
+            }
+            ScalarValue::List(_, _) => unimplemented!(),
+            ScalarValue::Date32(val) => {
+                eq_array_primitive!(array, index, Date32Array, val)
+            }
+            ScalarValue::Date64(val) => {
+                eq_array_primitive!(array, index, Date64Array, val)
+            }
+            ScalarValue::TimestampSecond(val) => {
+                eq_array_primitive!(array, index, TimestampSecondArray, val)
+            }
+            ScalarValue::TimestampMillisecond(val) => {
+                eq_array_primitive!(array, index, TimestampMillisecondArray, val)
+            }
+            ScalarValue::TimestampMicrosecond(val) => {
+                eq_array_primitive!(array, index, TimestampMicrosecondArray, val)
+            }
+            ScalarValue::TimestampNanosecond(val) => {
+                eq_array_primitive!(array, index, TimestampNanosecondArray, val)
+            }
+            ScalarValue::IntervalYearMonth(val) => {
+                eq_array_primitive!(array, index, IntervalYearMonthArray, val)
+            }
+            ScalarValue::IntervalDayTime(val) => {
+                eq_array_primitive!(array, index, IntervalDayTimeArray, val)
+            }
+        }
+    }

Review comment:
       Nice

##########
File path: datafusion/src/scalar.rs
##########
@@ -1654,6 +1776,159 @@ mod tests {
         assert_eq!(std::mem::size_of::<ScalarValue>(), 32);
     }
 
+    #[test]
+    fn scalar_eq_array() {
+        // Validate that eq_array has the same semantics as ScalarValue::eq
+        macro_rules! make_typed_vec {
+            ($INPUT:expr, $TYPE:ident) => {{
+                $INPUT
+                    .iter()
+                    .map(|v| v.map(|v| v as $TYPE))
+                    .collect::<Vec<_>>()
+            }};
+        }
+
+        let bool_vals = vec![Some(true), None, Some(false)];
+        let f32_vals = vec![Some(-1.0), None, Some(1.0)];
+        let f64_vals = make_typed_vec!(f32_vals, f64);
+
+        let i8_vals = vec![Some(-1), None, Some(1)];
+        let i16_vals = make_typed_vec!(i8_vals, i16);
+        let i32_vals = make_typed_vec!(i8_vals, i32);
+        let i64_vals = make_typed_vec!(i8_vals, i64);
+
+        let u8_vals = vec![Some(0), None, Some(1)];
+        let u16_vals = make_typed_vec!(u8_vals, u16);
+        let u32_vals = make_typed_vec!(u8_vals, u32);
+        let u64_vals = make_typed_vec!(u8_vals, u64);
+
+        let str_vals = vec![Some("foo"), None, Some("bar")];

Review comment:
       I wonder why all the second value is always NULL? Will it be more general to have it random (first or third)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #808: (WIP) Rework GroupByHash to for faster performance and support grouping by nulls

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808#discussion_r683978762



##########
File path: datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -363,55 +348,74 @@ fn group_aggregate_batch(
 
     let mut group_by_values = group_by_values.into_boxed_slice();
 
-    let mut key = Vec::with_capacity(group_values.len());
-
     // 1.1 construct the key from the group values
     // 1.2 construct the mapping key if it does not exist
     // 1.3 add the row' index to `indices`
 
     // Make sure we can create the accumulators or otherwise return an error
     create_accumulators(aggr_expr).map_err(DataFusionError::into_arrow_external_error)?;
 
-    // Keys received in this batch
-    let mut batch_keys = vec![];
+    // track which entries in `accumulators` have rows in this batch to aggregate
+    let mut groups_with_rows = vec![];
 
-    for row in 0..batch.num_rows() {
-        // 1.1
-        create_key(&group_values, row, &mut key)
-            .map_err(DataFusionError::into_arrow_external_error)?;
+    // 1.1 Calculate the group keys for the group values
+    let mut batch_hashes = vec![0; batch.num_rows()];
+    create_hashes(&group_values, random_state, &mut batch_hashes)?;
 
-        accumulators
-            .raw_entry_mut()
-            .from_key(&key)
-            // 1.3
-            .and_modify(|_, (_, _, v)| {
-                if v.is_empty() {
-                    batch_keys.push(key.clone())
+    for (row, hash) in batch_hashes.into_iter().enumerate() {
+        let Accumulators { map, group_states } = &mut accumulators;
+
+        let entry = map.get_mut(hash, |(_hash, group_idx)| {
+            // verify that a group that we are inserting with hash is
+            // actually the same key value as the group in
+            // existing_idx  (aka group_values @ row)
+            let group_state = &group_states[*group_idx];
+            group_values
+                .iter()
+                .zip(group_state.group_by_values.iter())
+                .all(|(array, scalar)| scalar.eq_array(array, row))
+        });
+
+        match entry {
+            // Existing entry for this group value
+            Some((_hash, group_idx)) => {
+                let group_state = &mut group_states[*group_idx];
+                // 1.3
+                if group_state.indices.is_empty() {
+                    groups_with_rows.push(*group_idx);
                 };
-                v.push(row as u32)
-            })
-            // 1.2
-            .or_insert_with(|| {
+                group_state.indices.push(row as u32); // remember this row
+            }
+            //  1.2 Need to create new entry
+            None => {
                 // We can safely unwrap here as we checked we can create an accumulator before
                 let accumulator_set = create_accumulators(aggr_expr).unwrap();
-                batch_keys.push(key.clone());
-                // Note it would be nice to make this a real error (rather than panic)
-                // but it is better than silently ignoring the issue and getting wrong results
-                create_group_by_values(&group_values, row, &mut group_by_values)
-                    .expect("can not create group by value");
-                (
-                    key.clone(),
-                    (group_by_values.clone(), accumulator_set, vec![row as u32]),
-                )
-            });
+
+                // Copy group values from arrays into ScalarValues
+                create_group_by_values(&group_values, row, &mut group_by_values)?;

Review comment:
       I think for the previous implementation for the `key` it made sense, as the key only had to be cloned when it had to be inserted. But here we are doing the `clone` afterwards each time anyways.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #808: Rework GroupByHash to for faster performance and support grouping by nulls

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808#issuecomment-896306350


   Thanks @Dandandan  and @jorgecarleitao  -- I plan to merge https://github.com/apache/arrow-datafusion/pull/812 in first and leave this one open for another few days in case anyone else wants to comment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #808: Rework GroupByHash to for faster performance and support grouping by nulls

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808#issuecomment-898877303


   That is an interesting article.
   
   Looking at the summary:
   
   > The common implementation of the function using hashing techniques
   suffers lower throughput rate due to the collision of the insert keys in
   the hashing techniques.....
   I actually found it very hard to test the group by collision handling
   correctness because the hashing technique in `create_hashes` was so good I
   could not find any example data that hased to the same value in a
   reasonable amount of time -- LOL
   However, the technique to search several slots at once might indeed be
   relevant
   
   
   <https://www.researchgate.net/figure/SIMD-accelerated-cuckoo-hashing-extended-from-Ross-et-al-14_fig1_326669722>
   
   On Sat, Aug 14, 2021 at 4:58 AM Jorge Leitao ***@***.***>
   wrote:
   
   > Potentially relevant:
   > https://www.researchgate.net/publication/326669722_SIMD_Vectorized_Hashing_for_Grouped_Aggregation_22nd_European_Conference_ADBIS_2018_Budapest_Hungary_September_2-5_2018_Proceedings
   >
   > —
   > You are receiving this because you were mentioned.
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/arrow-datafusion/pull/808#issuecomment-898873524>,
   > or unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AADXZMNMY67U56FRXCORIV3T4Y45BANCNFSM5BLDRZWA>
   > .
   > Triage notifications on the go with GitHub Mobile for iOS
   > <https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675>
   > or Android
   > <https://play.google.com/store/apps/details?id=com.github.android&utm_campaign=notification-email>
   > .
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on pull request #808: Rework GroupByHash to for faster performance and support grouping by nulls

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808#issuecomment-898881106


   Hashbrown already implements many tricks like this I believe, it's one of the fastest hash table implementations:
   https://docs.rs/hashbrown/0.11.2/hashbrown/hash_map/index.html
   
   There is also a nightly rawtable API to retrieve multiple values at once `get_each_mut`, which might be a bit faster.
   
   So far, in profiling results, I haven't seen the probing/hashmap itself being a very expensive part currently. AFAIK It's mostly other parts that could be optimized: updating the states/values, collision checks, converting to array, creating hash values, actual `sum` over the array, etc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on pull request #808: Rework GroupByHash to for faster performance and support grouping by nulls

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808#issuecomment-896287382


   I had a good look and think all looks GREAT!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #808: (WIP) Rework GroupByHash to for faster performance and support grouping by nulls

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808#discussion_r683754338



##########
File path: datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -363,55 +348,74 @@ fn group_aggregate_batch(
 
     let mut group_by_values = group_by_values.into_boxed_slice();
 
-    let mut key = Vec::with_capacity(group_values.len());
-
     // 1.1 construct the key from the group values
     // 1.2 construct the mapping key if it does not exist
     // 1.3 add the row' index to `indices`
 
     // Make sure we can create the accumulators or otherwise return an error
     create_accumulators(aggr_expr).map_err(DataFusionError::into_arrow_external_error)?;
 
-    // Keys received in this batch
-    let mut batch_keys = vec![];
+    // track which entries in `accumulators` have rows in this batch to aggregate
+    let mut groups_with_rows = vec![];
 
-    for row in 0..batch.num_rows() {
-        // 1.1
-        create_key(&group_values, row, &mut key)
-            .map_err(DataFusionError::into_arrow_external_error)?;
+    // 1.1 Calculate the group keys for the group values
+    let mut batch_hashes = vec![0; batch.num_rows()];
+    create_hashes(&group_values, random_state, &mut batch_hashes)?;
 
-        accumulators
-            .raw_entry_mut()
-            .from_key(&key)
-            // 1.3
-            .and_modify(|_, (_, _, v)| {
-                if v.is_empty() {
-                    batch_keys.push(key.clone())
+    for (row, hash) in batch_hashes.into_iter().enumerate() {
+        let Accumulators { map, group_states } = &mut accumulators;
+
+        let entry = map.get_mut(hash, |(_hash, group_idx)| {
+            // verify that a group that we are inserting with hash is

Review comment:
       So basically I would suggest to add this on top
   ```rust
   if hash != _hash {
       return false
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #808: Rework GroupByHash to for faster performance and support grouping by nulls

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808#issuecomment-897923361


   Thanks everyone for all the help. This was a very cool experience of collaborative development for me 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #808: (WIP) Rework GroupByHash to for faster performance and support grouping by nulls

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808#discussion_r683978762



##########
File path: datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -363,55 +348,74 @@ fn group_aggregate_batch(
 
     let mut group_by_values = group_by_values.into_boxed_slice();
 
-    let mut key = Vec::with_capacity(group_values.len());
-
     // 1.1 construct the key from the group values
     // 1.2 construct the mapping key if it does not exist
     // 1.3 add the row' index to `indices`
 
     // Make sure we can create the accumulators or otherwise return an error
     create_accumulators(aggr_expr).map_err(DataFusionError::into_arrow_external_error)?;
 
-    // Keys received in this batch
-    let mut batch_keys = vec![];
+    // track which entries in `accumulators` have rows in this batch to aggregate
+    let mut groups_with_rows = vec![];
 
-    for row in 0..batch.num_rows() {
-        // 1.1
-        create_key(&group_values, row, &mut key)
-            .map_err(DataFusionError::into_arrow_external_error)?;
+    // 1.1 Calculate the group keys for the group values
+    let mut batch_hashes = vec![0; batch.num_rows()];
+    create_hashes(&group_values, random_state, &mut batch_hashes)?;
 
-        accumulators
-            .raw_entry_mut()
-            .from_key(&key)
-            // 1.3
-            .and_modify(|_, (_, _, v)| {
-                if v.is_empty() {
-                    batch_keys.push(key.clone())
+    for (row, hash) in batch_hashes.into_iter().enumerate() {
+        let Accumulators { map, group_states } = &mut accumulators;
+
+        let entry = map.get_mut(hash, |(_hash, group_idx)| {
+            // verify that a group that we are inserting with hash is
+            // actually the same key value as the group in
+            // existing_idx  (aka group_values @ row)
+            let group_state = &group_states[*group_idx];
+            group_values
+                .iter()
+                .zip(group_state.group_by_values.iter())
+                .all(|(array, scalar)| scalar.eq_array(array, row))
+        });
+
+        match entry {
+            // Existing entry for this group value
+            Some((_hash, group_idx)) => {
+                let group_state = &mut group_states[*group_idx];
+                // 1.3
+                if group_state.indices.is_empty() {
+                    groups_with_rows.push(*group_idx);
                 };
-                v.push(row as u32)
-            })
-            // 1.2
-            .or_insert_with(|| {
+                group_state.indices.push(row as u32); // remember this row
+            }
+            //  1.2 Need to create new entry
+            None => {
                 // We can safely unwrap here as we checked we can create an accumulator before
                 let accumulator_set = create_accumulators(aggr_expr).unwrap();
-                batch_keys.push(key.clone());
-                // Note it would be nice to make this a real error (rather than panic)
-                // but it is better than silently ignoring the issue and getting wrong results
-                create_group_by_values(&group_values, row, &mut group_by_values)
-                    .expect("can not create group by value");
-                (
-                    key.clone(),
-                    (group_by_values.clone(), accumulator_set, vec![row as u32]),
-                )
-            });
+
+                // Copy group values from arrays into ScalarValues
+                create_group_by_values(&group_values, row, &mut group_by_values)?;

Review comment:
       I think for the previous implementation for the `key` it made sense, as the key only had to be cloned when it had to be inserted (and otherwise only used for indexing). But here we are doing the `clone` afterwards each time anyways.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] jorgecarleitao commented on pull request #808: Rework GroupByHash to for faster performance and support grouping by nulls

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808#issuecomment-898873524


   Potentially relevant: https://www.researchgate.net/publication/326669722_SIMD_Vectorized_Hashing_for_Grouped_Aggregation_22nd_European_Conference_ADBIS_2018_Budapest_Hungary_September_2-5_2018_Proceedings


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #808: (WIP) Rework GroupByHash to for faster performance and support grouping by nulls

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808#discussion_r683975473



##########
File path: datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -363,55 +348,74 @@ fn group_aggregate_batch(
 
     let mut group_by_values = group_by_values.into_boxed_slice();
 
-    let mut key = Vec::with_capacity(group_values.len());
-
     // 1.1 construct the key from the group values
     // 1.2 construct the mapping key if it does not exist
     // 1.3 add the row' index to `indices`
 
     // Make sure we can create the accumulators or otherwise return an error
     create_accumulators(aggr_expr).map_err(DataFusionError::into_arrow_external_error)?;
 
-    // Keys received in this batch
-    let mut batch_keys = vec![];
+    // track which entries in `accumulators` have rows in this batch to aggregate
+    let mut groups_with_rows = vec![];
 
-    for row in 0..batch.num_rows() {
-        // 1.1
-        create_key(&group_values, row, &mut key)
-            .map_err(DataFusionError::into_arrow_external_error)?;
+    // 1.1 Calculate the group keys for the group values
+    let mut batch_hashes = vec![0; batch.num_rows()];
+    create_hashes(&group_values, random_state, &mut batch_hashes)?;
 
-        accumulators
-            .raw_entry_mut()
-            .from_key(&key)
-            // 1.3
-            .and_modify(|_, (_, _, v)| {
-                if v.is_empty() {
-                    batch_keys.push(key.clone())
+    for (row, hash) in batch_hashes.into_iter().enumerate() {
+        let Accumulators { map, group_states } = &mut accumulators;
+
+        let entry = map.get_mut(hash, |(_hash, group_idx)| {
+            // verify that a group that we are inserting with hash is
+            // actually the same key value as the group in
+            // existing_idx  (aka group_values @ row)
+            let group_state = &group_states[*group_idx];
+            group_values
+                .iter()
+                .zip(group_state.group_by_values.iter())
+                .all(|(array, scalar)| scalar.eq_array(array, row))
+        });
+
+        match entry {
+            // Existing entry for this group value
+            Some((_hash, group_idx)) => {
+                let group_state = &mut group_states[*group_idx];
+                // 1.3
+                if group_state.indices.is_empty() {
+                    groups_with_rows.push(*group_idx);
                 };
-                v.push(row as u32)
-            })
-            // 1.2
-            .or_insert_with(|| {
+                group_state.indices.push(row as u32); // remember this row
+            }
+            //  1.2 Need to create new entry
+            None => {
                 // We can safely unwrap here as we checked we can create an accumulator before
                 let accumulator_set = create_accumulators(aggr_expr).unwrap();
-                batch_keys.push(key.clone());
-                // Note it would be nice to make this a real error (rather than panic)
-                // but it is better than silently ignoring the issue and getting wrong results
-                create_group_by_values(&group_values, row, &mut group_by_values)
-                    .expect("can not create group by value");
-                (
-                    key.clone(),
-                    (group_by_values.clone(), accumulator_set, vec![row as u32]),
-                )
-            });
+
+                // Copy group values from arrays into ScalarValues
+                create_group_by_values(&group_values, row, &mut group_by_values)?;

Review comment:
       Not sure whether this reusing of `Vec` + cloning of `Vec<Scalarvalue>` later over here is actually better. I think the time saved to reuse the `Vec`  might be less than cloning the individual `ScalarValues` (and it also makes the code harder to read).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan edited a comment on pull request #808: Rework GroupByHash to for faster performance and support grouping by nulls

Posted by GitBox <gi...@apache.org>.
Dandandan edited a comment on pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808#issuecomment-897147077


   A TCP-H query that also get's quite a bit faster is q13, on parquet SF=10 -> 37.8s -> 29.5s


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #808: (WIP) Rework GroupByHash to for faster performance and support grouping by nulls

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808#discussion_r683939523



##########
File path: datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -363,55 +348,74 @@ fn group_aggregate_batch(
 
     let mut group_by_values = group_by_values.into_boxed_slice();
 
-    let mut key = Vec::with_capacity(group_values.len());
-
     // 1.1 construct the key from the group values
     // 1.2 construct the mapping key if it does not exist
     // 1.3 add the row' index to `indices`
 
     // Make sure we can create the accumulators or otherwise return an error
     create_accumulators(aggr_expr).map_err(DataFusionError::into_arrow_external_error)?;
 
-    // Keys received in this batch
-    let mut batch_keys = vec![];
+    // track which entries in `accumulators` have rows in this batch to aggregate
+    let mut groups_with_rows = vec![];
 
-    for row in 0..batch.num_rows() {
-        // 1.1
-        create_key(&group_values, row, &mut key)
-            .map_err(DataFusionError::into_arrow_external_error)?;
+    // 1.1 Calculate the group keys for the group values
+    let mut batch_hashes = vec![0; batch.num_rows()];
+    create_hashes(&group_values, random_state, &mut batch_hashes)?;
 
-        accumulators
-            .raw_entry_mut()
-            .from_key(&key)
-            // 1.3
-            .and_modify(|_, (_, _, v)| {
-                if v.is_empty() {
-                    batch_keys.push(key.clone())
+    for (row, hash) in batch_hashes.into_iter().enumerate() {
+        let Accumulators { map, group_states } = &mut accumulators;
+
+        let entry = map.get_mut(hash, |(_hash, group_idx)| {
+            // verify that a group that we are inserting with hash is

Review comment:
       Based on looking at the implementation, which also mentions it here:
   
   https://rust-lang.github.io/hashbrown/hashbrown/raw/struct.RawTable.html#method.iter_hash
   
   It seems it might not happen that often.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on pull request #808: (WIP) Rework GroupByHash to for faster performance and support grouping by nulls

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808#issuecomment-895425959


   On the db-benchmark aggregation queries:
   
   PR:
   ```
   q1 took 33 ms
   q2 took 377 ms
   q3 took 986 ms
   q4 took 47 ms
   q5 took 973 ms
   q7 took 932 ms
   q10 took 4040 ms
   ```
   
   Master:
   ```
   q1 took 37 ms
   q2 took 325 ms
   q3 took 1431 ms
   q4 took 56 ms
   q5 took 1287 ms
   q7 took 1304 ms
   q10 took 9380 ms
   ```
   
   It looks like it's a small perf hit on q2,  but I think the other 4 queries do greatly compensate for this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan edited a comment on pull request #808: Rework GroupByHash to for faster performance and support grouping by nulls

Posted by GitBox <gi...@apache.org>.
Dandandan edited a comment on pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808#issuecomment-897147077


   A TCP-H query that got quite a bit faster is q13, on parquet SF=100 -> 37.8s -> 29.5s


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org