You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ak...@apache.org on 2023/04/11 06:40:02 UTC

[arrow-datafusion] branch main updated: minor: Refactor row_hash implementation (#5936)

This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 52fa2285b4 minor: Refactor row_hash implementation  (#5936)
52fa2285b4 is described below

commit 52fa2285b43ad6712e9b8bf6c05b4b8ff93f44f9
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Tue Apr 11 09:39:57 2023 +0300

    minor: Refactor row_hash implementation  (#5936)
    
    * split large chunks to functions
    
    * Remove double type annotations
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
 datafusion/common/src/utils.rs                     |  54 +++
 .../core/src/physical_plan/aggregates/row_hash.rs  | 401 +++++++++++----------
 2 files changed, 258 insertions(+), 197 deletions(-)

diff --git a/datafusion/common/src/utils.rs b/datafusion/common/src/utils.rs
index 3451152686..9b2604d2ee 100644
--- a/datafusion/common/src/utils.rs
+++ b/datafusion/common/src/utils.rs
@@ -19,7 +19,10 @@
 
 use crate::{DataFusionError, Result, ScalarValue};
 use arrow::array::ArrayRef;
+use arrow::compute;
 use arrow::compute::{lexicographical_partition_ranges, SortColumn, SortOptions};
+use arrow_array::types::UInt32Type;
+use arrow_array::PrimitiveArray;
 use sqlparser::ast::Ident;
 use sqlparser::dialect::GenericDialect;
 use sqlparser::parser::{Parser, ParserError};
@@ -260,6 +263,24 @@ pub(crate) fn parse_identifiers(s: &str) -> Result<Vec<Ident>> {
     Ok(idents)
 }
 
+/// Construct a new Vec<ArrayRef> from the rows of the `arrays` at the `indices`.
+pub fn get_arrayref_at_indices(
+    arrays: &[ArrayRef],
+    indices: &PrimitiveArray<UInt32Type>,
+) -> Result<Vec<ArrayRef>> {
+    arrays
+        .iter()
+        .map(|array| {
+            compute::take(
+                array.as_ref(),
+                indices,
+                None, // None: no index check
+            )
+            .map_err(DataFusionError::ArrowError)
+        })
+        .collect()
+}
+
 pub(crate) fn parse_identifiers_normalized(s: &str) -> Vec<String> {
     parse_identifiers(s)
         .unwrap_or_default()
@@ -579,4 +600,37 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_get_arrayref_at_indices() -> Result<()> {
+        let arrays: Vec<ArrayRef> = vec![
+            Arc::new(Float64Array::from_slice([5.0, 7.0, 8.0, 9., 10.])),
+            Arc::new(Float64Array::from_slice([2.0, 3.0, 3.0, 4.0, 5.0])),
+            Arc::new(Float64Array::from_slice([5.0, 7.0, 8.0, 10., 11.0])),
+            Arc::new(Float64Array::from_slice([15.0, 13.0, 8.0, 5., 0.0])),
+        ];
+
+        let row_indices_vec: Vec<Vec<u32>> = vec![
+            // Get rows 0 and 1
+            vec![0, 1],
+            // Get rows 0 and 1
+            vec![0, 2],
+            // Get rows 1 and 3
+            vec![1, 3],
+            // Get rows 2 and 4
+            vec![2, 4],
+        ];
+        for row_indices in row_indices_vec {
+            let indices = PrimitiveArray::from_iter_values(row_indices.iter().cloned());
+            let chunk = get_arrayref_at_indices(&arrays, &indices)?;
+            for (arr_orig, arr_chunk) in arrays.iter().zip(&chunk) {
+                for (idx, orig_idx) in row_indices.iter().enumerate() {
+                    let res1 = ScalarValue::try_from_array(arr_orig, *orig_idx as usize)?;
+                    let res2 = ScalarValue::try_from_array(arr_chunk, idx)?;
+                    assert_eq!(res1, res2);
+                }
+            }
+        }
+        Ok(())
+    }
 }
diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
index 612b707cc1..42ba9f8cb3 100644
--- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs
+++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
@@ -40,12 +40,11 @@ use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
 use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
 
 use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
-use arrow::array::{new_null_array, PrimitiveArray};
-use arrow::array::{Array, UInt32Builder};
+use arrow::array::{new_null_array, Array, ArrayRef, PrimitiveArray, UInt32Builder};
 use arrow::compute::cast;
 use arrow::datatypes::{DataType, Schema, UInt32Type};
-use arrow::{array::ArrayRef, compute};
 use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
+use datafusion_common::utils::get_arrayref_at_indices;
 use datafusion_common::{Result, ScalarValue};
 use datafusion_expr::Accumulator;
 use datafusion_row::accessor::RowAccessor;
@@ -294,6 +293,192 @@ impl RecordBatchStream for GroupedHashAggregateStream {
 }
 
 impl GroupedHashAggregateStream {
+    // Update the row_aggr_state according to groub_by values (result of group_by_expressions)
+    fn update_group_state(
+        &mut self,
+        group_values: &[ArrayRef],
+        allocated: &mut usize,
+    ) -> Result<Vec<usize>> {
+        let group_rows = self.row_converter.convert_columns(group_values)?;
+        let n_rows = group_rows.num_rows();
+        // 1.1 construct the key from the group values
+        // 1.2 construct the mapping key if it does not exist
+        // 1.3 add the row' index to `indices`
+
+        // track which entries in `aggr_state` have rows in this batch to aggregate
+        let mut groups_with_rows = vec![];
+
+        // 1.1 Calculate the group keys for the group values
+        let mut batch_hashes = vec![0; n_rows];
+        create_hashes(group_values, &self.random_state, &mut batch_hashes)?;
+
+        let RowAggregationState {
+            map: row_map,
+            group_states: row_group_states,
+            ..
+        } = &mut self.row_aggr_state;
+
+        for (row, hash) in batch_hashes.into_iter().enumerate() {
+            let entry = row_map.get_mut(hash, |(_hash, group_idx)| {
+                // verify that a group that we are inserting with hash is
+                // actually the same key value as the group in
+                // existing_idx  (aka group_values @ row)
+                let group_state = &row_group_states[*group_idx];
+                group_rows.row(row) == group_state.group_by_values.row()
+            });
+
+            match entry {
+                // Existing entry for this group value
+                Some((_hash, group_idx)) => {
+                    let group_state = &mut row_group_states[*group_idx];
+
+                    // 1.3
+                    if group_state.indices.is_empty() {
+                        groups_with_rows.push(*group_idx);
+                    };
+
+                    group_state.indices.push_accounted(row as u32, allocated); // remember this row
+                }
+                //  1.2 Need to create new entry
+                None => {
+                    let accumulator_set =
+                        aggregates::create_accumulators(&self.normal_aggr_expr)?;
+                    // Add new entry to group_states and save newly created index
+                    let group_state = RowGroupState {
+                        group_by_values: group_rows.row(row).owned(),
+                        aggregation_buffer: vec![
+                            0;
+                            self.row_aggr_layout.fixed_part_width()
+                        ],
+                        accumulator_set,
+                        indices: vec![row as u32], // 1.3
+                    };
+                    let group_idx = row_group_states.len();
+
+                    // NOTE: do NOT include the `RowGroupState` struct size in here because this is captured by
+                    // `group_states` (see allocation down below)
+                    *allocated += (std::mem::size_of::<u8>()
+                        * group_state.group_by_values.as_ref().len())
+                        + (std::mem::size_of::<u8>()
+                            * group_state.aggregation_buffer.capacity())
+                        + (std::mem::size_of::<u32>() * group_state.indices.capacity());
+
+                    // Allocation done by normal accumulators
+                    *allocated += (std::mem::size_of::<Box<dyn Accumulator>>()
+                        * group_state.accumulator_set.capacity())
+                        + group_state
+                            .accumulator_set
+                            .iter()
+                            .map(|accu| accu.size())
+                            .sum::<usize>();
+
+                    // for hasher function, use precomputed hash value
+                    row_map.insert_accounted(
+                        (hash, group_idx),
+                        |(hash, _group_index)| *hash,
+                        allocated,
+                    );
+
+                    row_group_states.push_accounted(group_state, allocated);
+
+                    groups_with_rows.push(group_idx);
+                }
+            };
+        }
+        Ok(groups_with_rows)
+    }
+
+    // Update the accumulator results, according to row_aggr_state.
+    fn update_accumulators(
+        &mut self,
+        groups_with_rows: &[usize],
+        offsets: &[usize],
+        row_values: &[Vec<ArrayRef>],
+        normal_values: &[Vec<ArrayRef>],
+        allocated: &mut usize,
+    ) -> Result<()> {
+        // 2.1 for each key in this batch
+        // 2.2 for each aggregation
+        // 2.3 `slice` from each of its arrays the keys' values
+        // 2.4 update / merge the accumulator with the values
+        // 2.5 clear indices
+        groups_with_rows
+            .iter()
+            .zip(offsets.windows(2))
+            .try_for_each(|(group_idx, offsets)| {
+                let group_state = &mut self.row_aggr_state.group_states[*group_idx];
+                // 2.2
+                self.row_accumulators
+                    .iter_mut()
+                    .zip(row_values.iter())
+                    .map(|(accumulator, aggr_array)| {
+                        (
+                            accumulator,
+                            aggr_array
+                                .iter()
+                                .map(|array| {
+                                    // 2.3
+                                    array.slice(offsets[0], offsets[1] - offsets[0])
+                                })
+                                .collect::<Vec<ArrayRef>>(),
+                        )
+                    })
+                    .try_for_each(|(accumulator, values)| {
+                        let mut state_accessor =
+                            RowAccessor::new_from_layout(self.row_aggr_layout.clone());
+                        state_accessor
+                            .point_to(0, group_state.aggregation_buffer.as_mut_slice());
+                        match self.mode {
+                            AggregateMode::Partial => {
+                                accumulator.update_batch(&values, &mut state_accessor)
+                            }
+                            AggregateMode::FinalPartitioned | AggregateMode::Final => {
+                                // note: the aggregation here is over states, not values, thus the merge
+                                accumulator.merge_batch(&values, &mut state_accessor)
+                            }
+                        }
+                    })
+                    // 2.5
+                    .and(Ok(()))?;
+                // normal accumulators
+                group_state
+                    .accumulator_set
+                    .iter_mut()
+                    .zip(normal_values.iter())
+                    .map(|(accumulator, aggr_array)| {
+                        (
+                            accumulator,
+                            aggr_array
+                                .iter()
+                                .map(|array| {
+                                    // 2.3
+                                    array.slice(offsets[0], offsets[1] - offsets[0])
+                                })
+                                .collect::<Vec<ArrayRef>>(),
+                        )
+                    })
+                    .try_for_each(|(accumulator, values)| {
+                        let size_pre = accumulator.size();
+                        let res = match self.mode {
+                            AggregateMode::Partial => accumulator.update_batch(&values),
+                            AggregateMode::FinalPartitioned | AggregateMode::Final => {
+                                // note: the aggregation here is over states, not values, thus the merge
+                                accumulator.merge_batch(&values)
+                            }
+                        };
+                        let size_post = accumulator.size();
+                        *allocated += size_post.saturating_sub(size_pre);
+                        res
+                    })
+                    // 2.5
+                    .and({
+                        group_state.indices.clear();
+                        Ok(())
+                    })
+            })?;
+        Ok(())
+    }
+
     /// Perform group-by aggregation for the given [`RecordBatch`].
     ///
     /// If successful, this returns the additional number of bytes that were allocated during this process.
@@ -303,11 +488,6 @@ impl GroupedHashAggregateStream {
         let group_by_values = evaluate_group_by(&self.group_by, &batch)?;
         // Keep track of memory allocated:
         let mut allocated = 0usize;
-        let RowAggregationState {
-            map: row_map,
-            group_states: row_group_states,
-            ..
-        } = &mut self.row_aggr_state;
 
         // Evaluate the aggregation expressions.
         // We could evaluate them after the `take`, but since we need to evaluate all
@@ -319,192 +499,31 @@ impl GroupedHashAggregateStream {
 
         let row_converter_size_pre = self.row_converter.size();
         for group_values in &group_by_values {
-            let group_rows = self.row_converter.convert_columns(group_values)?;
-
-            // 1.1 construct the key from the group values
-            // 1.2 construct the mapping key if it does not exist
-            // 1.3 add the row' index to `indices`
-
-            // track which entries in `aggr_state` have rows in this batch to aggregate
-            let mut groups_with_rows = vec![];
-
-            // 1.1 Calculate the group keys for the group values
-            let mut batch_hashes = vec![0; batch.num_rows()];
-            create_hashes(group_values, &self.random_state, &mut batch_hashes)?;
-
-            for (row, hash) in batch_hashes.into_iter().enumerate() {
-                let entry = row_map.get_mut(hash, |(_hash, group_idx)| {
-                    // verify that a group that we are inserting with hash is
-                    // actually the same key value as the group in
-                    // existing_idx  (aka group_values @ row)
-                    let group_state = &row_group_states[*group_idx];
-                    group_rows.row(row) == group_state.group_by_values.row()
-                });
-
-                match entry {
-                    // Existing entry for this group value
-                    Some((_hash, group_idx)) => {
-                        let group_state = &mut row_group_states[*group_idx];
-
-                        // 1.3
-                        if group_state.indices.is_empty() {
-                            groups_with_rows.push(*group_idx);
-                        };
-
-                        group_state
-                            .indices
-                            .push_accounted(row as u32, &mut allocated); // remember this row
-                    }
-                    //  1.2 Need to create new entry
-                    None => {
-                        let accumulator_set =
-                            aggregates::create_accumulators(&self.normal_aggr_expr)?;
-                        // Add new entry to group_states and save newly created index
-                        let group_state = RowGroupState {
-                            group_by_values: group_rows.row(row).owned(),
-                            aggregation_buffer: vec![
-                                0;
-                                self.row_aggr_layout
-                                    .fixed_part_width()
-                            ],
-                            accumulator_set,
-                            indices: vec![row as u32], // 1.3
-                        };
-                        let group_idx = row_group_states.len();
-
-                        // NOTE: do NOT include the `RowGroupState` struct size in here because this is captured by
-                        // `group_states` (see allocation down below)
-                        allocated += (std::mem::size_of::<u8>()
-                            * group_state.group_by_values.as_ref().len())
-                            + (std::mem::size_of::<u8>()
-                                * group_state.aggregation_buffer.capacity())
-                            + (std::mem::size_of::<u32>()
-                                * group_state.indices.capacity());
-
-                        // Allocation done by normal accumulators
-                        allocated += (std::mem::size_of::<Box<dyn Accumulator>>()
-                            * group_state.accumulator_set.capacity())
-                            + group_state
-                                .accumulator_set
-                                .iter()
-                                .map(|accu| accu.size())
-                                .sum::<usize>();
-
-                        // for hasher function, use precomputed hash value
-                        row_map.insert_accounted(
-                            (hash, group_idx),
-                            |(hash, _group_index)| *hash,
-                            &mut allocated,
-                        );
-
-                        row_group_states.push_accounted(group_state, &mut allocated);
-
-                        groups_with_rows.push(group_idx);
-                    }
-                };
-            }
+            let groups_with_rows =
+                self.update_group_state(group_values, &mut allocated)?;
 
             // Collect all indices + offsets based on keys in this vec
             let mut batch_indices: UInt32Builder = UInt32Builder::with_capacity(0);
             let mut offsets = vec![0];
             let mut offset_so_far = 0;
             for &group_idx in groups_with_rows.iter() {
-                let indices = &row_group_states[group_idx].indices;
+                let indices = &self.row_aggr_state.group_states[group_idx].indices;
                 batch_indices.append_slice(indices);
                 offset_so_far += indices.len();
                 offsets.push(offset_so_far);
             }
             let batch_indices = batch_indices.finish();
 
-            let row_values = get_at_indices(&row_aggr_input_values, &batch_indices);
-            let normal_values = get_at_indices(&normal_aggr_input_values, &batch_indices);
-
-            // 2.1 for each key in this batch
-            // 2.2 for each aggregation
-            // 2.3 `slice` from each of its arrays the keys' values
-            // 2.4 update / merge the accumulator with the values
-            // 2.5 clear indices
-            groups_with_rows
-                .iter()
-                .zip(offsets.windows(2))
-                .try_for_each(|(group_idx, offsets)| {
-                    let group_state = &mut row_group_states[*group_idx];
-                    // 2.2
-                    self.row_accumulators
-                        .iter_mut()
-                        .zip(row_values.iter())
-                        .map(|(accumulator, aggr_array)| {
-                            (
-                                accumulator,
-                                aggr_array
-                                    .iter()
-                                    .map(|array| {
-                                        // 2.3
-                                        array.slice(offsets[0], offsets[1] - offsets[0])
-                                    })
-                                    .collect::<Vec<ArrayRef>>(),
-                            )
-                        })
-                        .try_for_each(|(accumulator, values)| {
-                            let mut state_accessor = RowAccessor::new_from_layout(
-                                self.row_aggr_layout.clone(),
-                            );
-                            state_accessor.point_to(
-                                0,
-                                group_state.aggregation_buffer.as_mut_slice(),
-                            );
-                            match self.mode {
-                                AggregateMode::Partial => {
-                                    accumulator.update_batch(&values, &mut state_accessor)
-                                }
-                                AggregateMode::FinalPartitioned
-                                | AggregateMode::Final => {
-                                    // note: the aggregation here is over states, not values, thus the merge
-                                    accumulator.merge_batch(&values, &mut state_accessor)
-                                }
-                            }
-                        })
-                        // 2.5
-                        .and(Ok(()))?;
-                    // normal accumulators
-                    group_state
-                        .accumulator_set
-                        .iter_mut()
-                        .zip(normal_values.iter())
-                        .map(|(accumulator, aggr_array)| {
-                            (
-                                accumulator,
-                                aggr_array
-                                    .iter()
-                                    .map(|array| {
-                                        // 2.3
-                                        array.slice(offsets[0], offsets[1] - offsets[0])
-                                    })
-                                    .collect::<Vec<ArrayRef>>(),
-                            )
-                        })
-                        .try_for_each(|(accumulator, values)| {
-                            let size_pre = accumulator.size();
-                            let res = match self.mode {
-                                AggregateMode::Partial => {
-                                    accumulator.update_batch(&values)
-                                }
-                                AggregateMode::FinalPartitioned
-                                | AggregateMode::Final => {
-                                    // note: the aggregation here is over states, not values, thus the merge
-                                    accumulator.merge_batch(&values)
-                                }
-                            };
-                            let size_post = accumulator.size();
-                            allocated += size_post.saturating_sub(size_pre);
-                            res
-                        })
-                        // 2.5
-                        .and({
-                            group_state.indices.clear();
-                            Ok(())
-                        })
-                })?;
+            let row_values = get_at_indices(&row_aggr_input_values, &batch_indices)?;
+            let normal_values =
+                get_at_indices(&normal_aggr_input_values, &batch_indices)?;
+            self.update_accumulators(
+                &groups_with_rows,
+                &offsets,
+                &row_values,
+                &normal_values,
+                &mut allocated,
+            )?;
         }
         allocated += self
             .row_converter
@@ -699,23 +718,11 @@ fn read_as_batch(rows: &[Vec<u8>], schema: &Schema, row_type: RowType) -> Vec<Ar
 }
 
 fn get_at_indices(
-    input_values: &[Vec<Arc<dyn Array>>],
+    input_values: &[Vec<ArrayRef>],
     batch_indices: &PrimitiveArray<UInt32Type>,
-) -> Vec<Vec<Arc<dyn Array>>> {
+) -> Result<Vec<Vec<ArrayRef>>> {
     input_values
         .iter()
-        .map(|array| {
-            array
-                .iter()
-                .map(|array| {
-                    compute::take(
-                        array.as_ref(),
-                        batch_indices,
-                        None, // None: no index check
-                    )
-                    .unwrap()
-                })
-                .collect()
-        })
+        .map(|array| get_arrayref_at_indices(array, batch_indices))
         .collect()
 }