You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ak...@apache.org on 2023/04/11 06:40:02 UTC
[arrow-datafusion] branch main updated: minor: Refactor row_hash implementation (#5936)
This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 52fa2285b4 minor: Refactor row_hash implementation (#5936)
52fa2285b4 is described below
commit 52fa2285b43ad6712e9b8bf6c05b4b8ff93f44f9
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Tue Apr 11 09:39:57 2023 +0300
minor: Refactor row_hash implementation (#5936)
* split large chunks to functions
* Remove double type annotations
---------
Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
datafusion/common/src/utils.rs | 54 +++
.../core/src/physical_plan/aggregates/row_hash.rs | 401 +++++++++++----------
2 files changed, 258 insertions(+), 197 deletions(-)
diff --git a/datafusion/common/src/utils.rs b/datafusion/common/src/utils.rs
index 3451152686..9b2604d2ee 100644
--- a/datafusion/common/src/utils.rs
+++ b/datafusion/common/src/utils.rs
@@ -19,7 +19,10 @@
use crate::{DataFusionError, Result, ScalarValue};
use arrow::array::ArrayRef;
+use arrow::compute;
use arrow::compute::{lexicographical_partition_ranges, SortColumn, SortOptions};
+use arrow_array::types::UInt32Type;
+use arrow_array::PrimitiveArray;
use sqlparser::ast::Ident;
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::{Parser, ParserError};
@@ -260,6 +263,24 @@ pub(crate) fn parse_identifiers(s: &str) -> Result<Vec<Ident>> {
Ok(idents)
}
+/// Construct a new Vec<ArrayRef> from the rows of the `arrays` at the `indices`.
+pub fn get_arrayref_at_indices(
+ arrays: &[ArrayRef],
+ indices: &PrimitiveArray<UInt32Type>,
+) -> Result<Vec<ArrayRef>> {
+ arrays
+ .iter()
+ .map(|array| {
+ compute::take(
+ array.as_ref(),
+ indices,
+ None, // None: no index check
+ )
+ .map_err(DataFusionError::ArrowError)
+ })
+ .collect()
+}
+
pub(crate) fn parse_identifiers_normalized(s: &str) -> Vec<String> {
parse_identifiers(s)
.unwrap_or_default()
@@ -579,4 +600,37 @@ mod tests {
Ok(())
}
+
+ #[test]
+ fn test_get_arrayref_at_indices() -> Result<()> {
+ let arrays: Vec<ArrayRef> = vec![
+ Arc::new(Float64Array::from_slice([5.0, 7.0, 8.0, 9., 10.])),
+ Arc::new(Float64Array::from_slice([2.0, 3.0, 3.0, 4.0, 5.0])),
+ Arc::new(Float64Array::from_slice([5.0, 7.0, 8.0, 10., 11.0])),
+ Arc::new(Float64Array::from_slice([15.0, 13.0, 8.0, 5., 0.0])),
+ ];
+
+ let row_indices_vec: Vec<Vec<u32>> = vec![
+ // Get rows 0 and 1
+ vec![0, 1],
+ // Get rows 0 and 1
+ vec![0, 2],
+ // Get rows 1 and 3
+ vec![1, 3],
+ // Get rows 2 and 4
+ vec![2, 4],
+ ];
+ for row_indices in row_indices_vec {
+ let indices = PrimitiveArray::from_iter_values(row_indices.iter().cloned());
+ let chunk = get_arrayref_at_indices(&arrays, &indices)?;
+ for (arr_orig, arr_chunk) in arrays.iter().zip(&chunk) {
+ for (idx, orig_idx) in row_indices.iter().enumerate() {
+ let res1 = ScalarValue::try_from_array(arr_orig, *orig_idx as usize)?;
+ let res2 = ScalarValue::try_from_array(arr_chunk, idx)?;
+ assert_eq!(res1, res2);
+ }
+ }
+ }
+ Ok(())
+ }
}
diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
index 612b707cc1..42ba9f8cb3 100644
--- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs
+++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
@@ -40,12 +40,11 @@ use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
-use arrow::array::{new_null_array, PrimitiveArray};
-use arrow::array::{Array, UInt32Builder};
+use arrow::array::{new_null_array, Array, ArrayRef, PrimitiveArray, UInt32Builder};
use arrow::compute::cast;
use arrow::datatypes::{DataType, Schema, UInt32Type};
-use arrow::{array::ArrayRef, compute};
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
+use datafusion_common::utils::get_arrayref_at_indices;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::Accumulator;
use datafusion_row::accessor::RowAccessor;
@@ -294,6 +293,192 @@ impl RecordBatchStream for GroupedHashAggregateStream {
}
impl GroupedHashAggregateStream {
+ // Update the row_aggr_state according to groub_by values (result of group_by_expressions)
+ fn update_group_state(
+ &mut self,
+ group_values: &[ArrayRef],
+ allocated: &mut usize,
+ ) -> Result<Vec<usize>> {
+ let group_rows = self.row_converter.convert_columns(group_values)?;
+ let n_rows = group_rows.num_rows();
+ // 1.1 construct the key from the group values
+ // 1.2 construct the mapping key if it does not exist
+ // 1.3 add the row' index to `indices`
+
+ // track which entries in `aggr_state` have rows in this batch to aggregate
+ let mut groups_with_rows = vec![];
+
+ // 1.1 Calculate the group keys for the group values
+ let mut batch_hashes = vec![0; n_rows];
+ create_hashes(group_values, &self.random_state, &mut batch_hashes)?;
+
+ let RowAggregationState {
+ map: row_map,
+ group_states: row_group_states,
+ ..
+ } = &mut self.row_aggr_state;
+
+ for (row, hash) in batch_hashes.into_iter().enumerate() {
+ let entry = row_map.get_mut(hash, |(_hash, group_idx)| {
+ // verify that a group that we are inserting with hash is
+ // actually the same key value as the group in
+ // existing_idx (aka group_values @ row)
+ let group_state = &row_group_states[*group_idx];
+ group_rows.row(row) == group_state.group_by_values.row()
+ });
+
+ match entry {
+ // Existing entry for this group value
+ Some((_hash, group_idx)) => {
+ let group_state = &mut row_group_states[*group_idx];
+
+ // 1.3
+ if group_state.indices.is_empty() {
+ groups_with_rows.push(*group_idx);
+ };
+
+ group_state.indices.push_accounted(row as u32, allocated); // remember this row
+ }
+ // 1.2 Need to create new entry
+ None => {
+ let accumulator_set =
+ aggregates::create_accumulators(&self.normal_aggr_expr)?;
+ // Add new entry to group_states and save newly created index
+ let group_state = RowGroupState {
+ group_by_values: group_rows.row(row).owned(),
+ aggregation_buffer: vec![
+ 0;
+ self.row_aggr_layout.fixed_part_width()
+ ],
+ accumulator_set,
+ indices: vec![row as u32], // 1.3
+ };
+ let group_idx = row_group_states.len();
+
+ // NOTE: do NOT include the `RowGroupState` struct size in here because this is captured by
+ // `group_states` (see allocation down below)
+ *allocated += (std::mem::size_of::<u8>()
+ * group_state.group_by_values.as_ref().len())
+ + (std::mem::size_of::<u8>()
+ * group_state.aggregation_buffer.capacity())
+ + (std::mem::size_of::<u32>() * group_state.indices.capacity());
+
+ // Allocation done by normal accumulators
+ *allocated += (std::mem::size_of::<Box<dyn Accumulator>>()
+ * group_state.accumulator_set.capacity())
+ + group_state
+ .accumulator_set
+ .iter()
+ .map(|accu| accu.size())
+ .sum::<usize>();
+
+ // for hasher function, use precomputed hash value
+ row_map.insert_accounted(
+ (hash, group_idx),
+ |(hash, _group_index)| *hash,
+ allocated,
+ );
+
+ row_group_states.push_accounted(group_state, allocated);
+
+ groups_with_rows.push(group_idx);
+ }
+ };
+ }
+ Ok(groups_with_rows)
+ }
+
+ // Update the accumulator results, according to row_aggr_state.
+ fn update_accumulators(
+ &mut self,
+ groups_with_rows: &[usize],
+ offsets: &[usize],
+ row_values: &[Vec<ArrayRef>],
+ normal_values: &[Vec<ArrayRef>],
+ allocated: &mut usize,
+ ) -> Result<()> {
+ // 2.1 for each key in this batch
+ // 2.2 for each aggregation
+ // 2.3 `slice` from each of its arrays the keys' values
+ // 2.4 update / merge the accumulator with the values
+ // 2.5 clear indices
+ groups_with_rows
+ .iter()
+ .zip(offsets.windows(2))
+ .try_for_each(|(group_idx, offsets)| {
+ let group_state = &mut self.row_aggr_state.group_states[*group_idx];
+ // 2.2
+ self.row_accumulators
+ .iter_mut()
+ .zip(row_values.iter())
+ .map(|(accumulator, aggr_array)| {
+ (
+ accumulator,
+ aggr_array
+ .iter()
+ .map(|array| {
+ // 2.3
+ array.slice(offsets[0], offsets[1] - offsets[0])
+ })
+ .collect::<Vec<ArrayRef>>(),
+ )
+ })
+ .try_for_each(|(accumulator, values)| {
+ let mut state_accessor =
+ RowAccessor::new_from_layout(self.row_aggr_layout.clone());
+ state_accessor
+ .point_to(0, group_state.aggregation_buffer.as_mut_slice());
+ match self.mode {
+ AggregateMode::Partial => {
+ accumulator.update_batch(&values, &mut state_accessor)
+ }
+ AggregateMode::FinalPartitioned | AggregateMode::Final => {
+ // note: the aggregation here is over states, not values, thus the merge
+ accumulator.merge_batch(&values, &mut state_accessor)
+ }
+ }
+ })
+ // 2.5
+ .and(Ok(()))?;
+ // normal accumulators
+ group_state
+ .accumulator_set
+ .iter_mut()
+ .zip(normal_values.iter())
+ .map(|(accumulator, aggr_array)| {
+ (
+ accumulator,
+ aggr_array
+ .iter()
+ .map(|array| {
+ // 2.3
+ array.slice(offsets[0], offsets[1] - offsets[0])
+ })
+ .collect::<Vec<ArrayRef>>(),
+ )
+ })
+ .try_for_each(|(accumulator, values)| {
+ let size_pre = accumulator.size();
+ let res = match self.mode {
+ AggregateMode::Partial => accumulator.update_batch(&values),
+ AggregateMode::FinalPartitioned | AggregateMode::Final => {
+ // note: the aggregation here is over states, not values, thus the merge
+ accumulator.merge_batch(&values)
+ }
+ };
+ let size_post = accumulator.size();
+ *allocated += size_post.saturating_sub(size_pre);
+ res
+ })
+ // 2.5
+ .and({
+ group_state.indices.clear();
+ Ok(())
+ })
+ })?;
+ Ok(())
+ }
+
/// Perform group-by aggregation for the given [`RecordBatch`].
///
/// If successful, this returns the additional number of bytes that were allocated during this process.
@@ -303,11 +488,6 @@ impl GroupedHashAggregateStream {
let group_by_values = evaluate_group_by(&self.group_by, &batch)?;
// Keep track of memory allocated:
let mut allocated = 0usize;
- let RowAggregationState {
- map: row_map,
- group_states: row_group_states,
- ..
- } = &mut self.row_aggr_state;
// Evaluate the aggregation expressions.
// We could evaluate them after the `take`, but since we need to evaluate all
@@ -319,192 +499,31 @@ impl GroupedHashAggregateStream {
let row_converter_size_pre = self.row_converter.size();
for group_values in &group_by_values {
- let group_rows = self.row_converter.convert_columns(group_values)?;
-
- // 1.1 construct the key from the group values
- // 1.2 construct the mapping key if it does not exist
- // 1.3 add the row' index to `indices`
-
- // track which entries in `aggr_state` have rows in this batch to aggregate
- let mut groups_with_rows = vec![];
-
- // 1.1 Calculate the group keys for the group values
- let mut batch_hashes = vec![0; batch.num_rows()];
- create_hashes(group_values, &self.random_state, &mut batch_hashes)?;
-
- for (row, hash) in batch_hashes.into_iter().enumerate() {
- let entry = row_map.get_mut(hash, |(_hash, group_idx)| {
- // verify that a group that we are inserting with hash is
- // actually the same key value as the group in
- // existing_idx (aka group_values @ row)
- let group_state = &row_group_states[*group_idx];
- group_rows.row(row) == group_state.group_by_values.row()
- });
-
- match entry {
- // Existing entry for this group value
- Some((_hash, group_idx)) => {
- let group_state = &mut row_group_states[*group_idx];
-
- // 1.3
- if group_state.indices.is_empty() {
- groups_with_rows.push(*group_idx);
- };
-
- group_state
- .indices
- .push_accounted(row as u32, &mut allocated); // remember this row
- }
- // 1.2 Need to create new entry
- None => {
- let accumulator_set =
- aggregates::create_accumulators(&self.normal_aggr_expr)?;
- // Add new entry to group_states and save newly created index
- let group_state = RowGroupState {
- group_by_values: group_rows.row(row).owned(),
- aggregation_buffer: vec![
- 0;
- self.row_aggr_layout
- .fixed_part_width()
- ],
- accumulator_set,
- indices: vec![row as u32], // 1.3
- };
- let group_idx = row_group_states.len();
-
- // NOTE: do NOT include the `RowGroupState` struct size in here because this is captured by
- // `group_states` (see allocation down below)
- allocated += (std::mem::size_of::<u8>()
- * group_state.group_by_values.as_ref().len())
- + (std::mem::size_of::<u8>()
- * group_state.aggregation_buffer.capacity())
- + (std::mem::size_of::<u32>()
- * group_state.indices.capacity());
-
- // Allocation done by normal accumulators
- allocated += (std::mem::size_of::<Box<dyn Accumulator>>()
- * group_state.accumulator_set.capacity())
- + group_state
- .accumulator_set
- .iter()
- .map(|accu| accu.size())
- .sum::<usize>();
-
- // for hasher function, use precomputed hash value
- row_map.insert_accounted(
- (hash, group_idx),
- |(hash, _group_index)| *hash,
- &mut allocated,
- );
-
- row_group_states.push_accounted(group_state, &mut allocated);
-
- groups_with_rows.push(group_idx);
- }
- };
- }
+ let groups_with_rows =
+ self.update_group_state(group_values, &mut allocated)?;
// Collect all indices + offsets based on keys in this vec
let mut batch_indices: UInt32Builder = UInt32Builder::with_capacity(0);
let mut offsets = vec![0];
let mut offset_so_far = 0;
for &group_idx in groups_with_rows.iter() {
- let indices = &row_group_states[group_idx].indices;
+ let indices = &self.row_aggr_state.group_states[group_idx].indices;
batch_indices.append_slice(indices);
offset_so_far += indices.len();
offsets.push(offset_so_far);
}
let batch_indices = batch_indices.finish();
- let row_values = get_at_indices(&row_aggr_input_values, &batch_indices);
- let normal_values = get_at_indices(&normal_aggr_input_values, &batch_indices);
-
- // 2.1 for each key in this batch
- // 2.2 for each aggregation
- // 2.3 `slice` from each of its arrays the keys' values
- // 2.4 update / merge the accumulator with the values
- // 2.5 clear indices
- groups_with_rows
- .iter()
- .zip(offsets.windows(2))
- .try_for_each(|(group_idx, offsets)| {
- let group_state = &mut row_group_states[*group_idx];
- // 2.2
- self.row_accumulators
- .iter_mut()
- .zip(row_values.iter())
- .map(|(accumulator, aggr_array)| {
- (
- accumulator,
- aggr_array
- .iter()
- .map(|array| {
- // 2.3
- array.slice(offsets[0], offsets[1] - offsets[0])
- })
- .collect::<Vec<ArrayRef>>(),
- )
- })
- .try_for_each(|(accumulator, values)| {
- let mut state_accessor = RowAccessor::new_from_layout(
- self.row_aggr_layout.clone(),
- );
- state_accessor.point_to(
- 0,
- group_state.aggregation_buffer.as_mut_slice(),
- );
- match self.mode {
- AggregateMode::Partial => {
- accumulator.update_batch(&values, &mut state_accessor)
- }
- AggregateMode::FinalPartitioned
- | AggregateMode::Final => {
- // note: the aggregation here is over states, not values, thus the merge
- accumulator.merge_batch(&values, &mut state_accessor)
- }
- }
- })
- // 2.5
- .and(Ok(()))?;
- // normal accumulators
- group_state
- .accumulator_set
- .iter_mut()
- .zip(normal_values.iter())
- .map(|(accumulator, aggr_array)| {
- (
- accumulator,
- aggr_array
- .iter()
- .map(|array| {
- // 2.3
- array.slice(offsets[0], offsets[1] - offsets[0])
- })
- .collect::<Vec<ArrayRef>>(),
- )
- })
- .try_for_each(|(accumulator, values)| {
- let size_pre = accumulator.size();
- let res = match self.mode {
- AggregateMode::Partial => {
- accumulator.update_batch(&values)
- }
- AggregateMode::FinalPartitioned
- | AggregateMode::Final => {
- // note: the aggregation here is over states, not values, thus the merge
- accumulator.merge_batch(&values)
- }
- };
- let size_post = accumulator.size();
- allocated += size_post.saturating_sub(size_pre);
- res
- })
- // 2.5
- .and({
- group_state.indices.clear();
- Ok(())
- })
- })?;
+ let row_values = get_at_indices(&row_aggr_input_values, &batch_indices)?;
+ let normal_values =
+ get_at_indices(&normal_aggr_input_values, &batch_indices)?;
+ self.update_accumulators(
+ &groups_with_rows,
+ &offsets,
+ &row_values,
+ &normal_values,
+ &mut allocated,
+ )?;
}
allocated += self
.row_converter
@@ -699,23 +718,11 @@ fn read_as_batch(rows: &[Vec<u8>], schema: &Schema, row_type: RowType) -> Vec<Ar
}
fn get_at_indices(
- input_values: &[Vec<Arc<dyn Array>>],
+ input_values: &[Vec<ArrayRef>],
batch_indices: &PrimitiveArray<UInt32Type>,
-) -> Vec<Vec<Arc<dyn Array>>> {
+) -> Result<Vec<Vec<ArrayRef>>> {
input_values
.iter()
- .map(|array| {
- array
- .iter()
- .map(|array| {
- compute::take(
- array.as_ref(),
- batch_indices,
- None, // None: no index check
- )
- .unwrap()
- })
- .collect()
- })
+ .map(|array| get_arrayref_at_indices(array, batch_indices))
.collect()
}