You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2021/08/12 18:37:49 UTC
[arrow-datafusion] branch master updated: Rework GroupByHash to for
faster performance and support grouping by nulls (#808)
This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new fa3f099 Rework GroupByHash to for faster performance and support grouping by nulls (#808)
fa3f099 is described below
commit fa3f0998dfe2dfedb0ab67665329e177e000fc64
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Thu Aug 12 14:37:44 2021 -0400
Rework GroupByHash to for faster performance and support grouping by nulls (#808)
* Implement faster GroupByHash design
* Rewrite to use RawMap per Dandandan suggestion
* remove stub
* Return error with create_accumulators
* Do not memoize group key creation
---
datafusion/Cargo.toml | 2 +-
datafusion/src/physical_plan/hash_aggregate.rs | 458 +++++++------------------
2 files changed, 130 insertions(+), 330 deletions(-)
diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml
index 2716cc7..286be8a 100644
--- a/datafusion/Cargo.toml
+++ b/datafusion/Cargo.toml
@@ -48,7 +48,7 @@ force_hash_collisions = []
[dependencies]
ahash = "0.7"
-hashbrown = "0.11"
+hashbrown = { version = "0.11", features = ["raw"] }
arrow = { version = "5.1", features = ["prettyprint"] }
parquet = { version = "5.1", features = ["arrow"] }
sqlparser = "0.9.0"
diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs
index 5c3c576..1c07f61 100644
--- a/datafusion/src/physical_plan/hash_aggregate.rs
+++ b/datafusion/src/physical_plan/hash_aggregate.rs
@@ -27,44 +27,28 @@ use futures::{
stream::{Stream, StreamExt},
Future,
};
+use hashbrown::HashMap;
use crate::error::{DataFusionError, Result};
+use crate::physical_plan::hash_utils::create_hashes;
use crate::physical_plan::{
Accumulator, AggregateExpr, DisplayFormatType, Distribution, ExecutionPlan,
Partitioning, PhysicalExpr, SQLMetric,
};
use crate::scalar::ScalarValue;
+use arrow::{array::ArrayRef, compute, compute::cast};
use arrow::{
array::{Array, UInt32Builder},
error::{ArrowError, Result as ArrowResult},
};
use arrow::{
- array::{
- ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array,
- Int8Array, StringArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
- },
- compute,
-};
-use arrow::{
- array::{BooleanArray, Date32Array, DictionaryArray},
- compute::cast,
- datatypes::{
- ArrowDictionaryKeyType, ArrowNativeType, Int16Type, Int32Type, Int64Type,
- Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
- },
-};
-use arrow::{
- datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit},
+ datatypes::{Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
-use hashbrown::HashMap;
+use hashbrown::raw::RawTable;
use pin_project_lite::pin_project;
-use arrow::array::{
- LargeStringArray, TimestampMicrosecondArray, TimestampMillisecondArray,
- TimestampNanosecondArray,
-};
use async_trait::async_trait;
use super::{expressions::Column, RecordBatchStream, SendableRecordBatchStream};
@@ -339,6 +323,7 @@ pin_project! {
fn group_aggregate_batch(
mode: &AggregateMode,
+ random_state: &RandomState,
group_expr: &[Arc<dyn PhysicalExpr>],
aggr_expr: &[Arc<dyn AggregateExpr>],
batch: RecordBatch,
@@ -361,57 +346,74 @@ fn group_aggregate_batch(
group_by_values.push(ScalarValue::UInt32(Some(0)));
}
- let mut group_by_values = group_by_values.into_boxed_slice();
-
- let mut key = Vec::with_capacity(group_values.len());
-
// 1.1 construct the key from the group values
// 1.2 construct the mapping key if it does not exist
// 1.3 add the row' index to `indices`
- // Make sure we can create the accumulators or otherwise return an error
- create_accumulators(aggr_expr).map_err(DataFusionError::into_arrow_external_error)?;
+ // track which entries in `accumulators` have rows in this batch to aggregate
+ let mut groups_with_rows = vec![];
- // Keys received in this batch
- let mut batch_keys = vec![];
+ // 1.1 Calculate the group keys for the group values
+ let mut batch_hashes = vec![0; batch.num_rows()];
+ create_hashes(&group_values, random_state, &mut batch_hashes)?;
- for row in 0..batch.num_rows() {
- // 1.1
- create_key(&group_values, row, &mut key)
- .map_err(DataFusionError::into_arrow_external_error)?;
+ for (row, hash) in batch_hashes.into_iter().enumerate() {
+ let Accumulators { map, group_states } = &mut accumulators;
- accumulators
- .raw_entry_mut()
- .from_key(&key)
- // 1.3
- .and_modify(|_, (_, _, v)| {
- if v.is_empty() {
- batch_keys.push(key.clone())
+ let entry = map.get_mut(hash, |(_hash, group_idx)| {
+ // verify that a group that we are inserting with hash is
+ // actually the same key value as the group in
+ // existing_idx (aka group_values @ row)
+ let group_state = &group_states[*group_idx];
+ group_values
+ .iter()
+ .zip(group_state.group_by_values.iter())
+ .all(|(array, scalar)| scalar.eq_array(array, row))
+ });
+
+ match entry {
+ // Existing entry for this group value
+ Some((_hash, group_idx)) => {
+ let group_state = &mut group_states[*group_idx];
+ // 1.3
+ if group_state.indices.is_empty() {
+ groups_with_rows.push(*group_idx);
};
- v.push(row as u32)
- })
- // 1.2
- .or_insert_with(|| {
- // We can safely unwrap here as we checked we can create an accumulator before
- let accumulator_set = create_accumulators(aggr_expr).unwrap();
- batch_keys.push(key.clone());
- // Note it would be nice to make this a real error (rather than panic)
- // but it is better than silently ignoring the issue and getting wrong results
- create_group_by_values(&group_values, row, &mut group_by_values)
- .expect("can not create group by value");
- (
- key.clone(),
- (group_by_values.clone(), accumulator_set, vec![row as u32]),
- )
- });
+ group_state.indices.push(row as u32); // remember this row
+ }
+ // 1.2 Need to create new entry
+ None => {
+ let accumulator_set = create_accumulators(aggr_expr)
+ .map_err(DataFusionError::into_arrow_external_error)?;
+
+ // Copy group values out of arrays into `ScalarValue`s
+ let group_by_values = group_values
+ .iter()
+ .map(|col| ScalarValue::try_from_array(col, row))
+ .collect::<Result<Vec<_>>>()?;
+
+ // Add new entry to group_states and save newly created index
+ let group_state = GroupState {
+ group_by_values: group_by_values.into_boxed_slice(),
+ accumulator_set,
+ indices: vec![row as u32], // 1.3
+ };
+ let group_idx = group_states.len();
+ group_states.push(group_state);
+ groups_with_rows.push(group_idx);
+
+ // for hasher function, use precomputed hash value
+ map.insert(hash, (hash, group_idx), |(hash, _group_idx)| *hash);
+ }
+ };
}
// Collect all indices + offsets based on keys in this vec
let mut batch_indices: UInt32Builder = UInt32Builder::new(0);
let mut offsets = vec![0];
let mut offset_so_far = 0;
- for key in batch_keys.iter() {
- let (_, _, indices) = accumulators.get_mut(key).unwrap();
+ for group_idx in groups_with_rows.iter() {
+ let indices = &accumulators.group_states[*group_idx].indices;
batch_indices.append_slice(indices)?;
offset_so_far += indices.len();
offsets.push(offset_so_far);
@@ -442,13 +444,14 @@ fn group_aggregate_batch(
// 2.3 `slice` from each of its arrays the keys' values
// 2.4 update / merge the accumulator with the values
// 2.5 clear indices
- batch_keys
- .iter_mut()
+ groups_with_rows
+ .iter()
.zip(offsets.windows(2))
- .try_for_each(|(key, offsets)| {
- let (_, accumulator_set, indices) = accumulators.get_mut(key).unwrap();
+ .try_for_each(|(group_idx, offsets)| {
+ let group_state = &mut accumulators.group_states[*group_idx];
// 2.2
- accumulator_set
+ group_state
+ .accumulator_set
.iter_mut()
.zip(values.iter())
.map(|(accumulator, aggr_array)| {
@@ -472,238 +475,12 @@ fn group_aggregate_batch(
})
// 2.5
.and({
- indices.clear();
+ group_state.indices.clear();
Ok(())
})
})?;
- Ok(accumulators)
-}
-/// Appends a sequence of [u8] bytes for the value in `col[row]` to
-/// `vec` to be used as a key into the hash map for a dictionary type
-///
-/// Note that ideally, for dictionary encoded columns, we would be
-/// able to simply use the dictionary idicies themselves (no need to
-/// look up values) or possibly simply build the hash table entirely
-/// on the dictionary indexes.
-///
-/// This aproach would likely work (very) well for the common case,
-/// but it also has to to handle the case where the dictionary itself
-/// is not the same across all record batches (and thus indexes in one
-/// record batch may not correspond to the same index in another)
-fn dictionary_create_key_for_col<K: ArrowDictionaryKeyType>(
- col: &ArrayRef,
- row: usize,
- vec: &mut Vec<u8>,
-) -> Result<()> {
- let dict_col = col.as_any().downcast_ref::<DictionaryArray<K>>().unwrap();
-
- // look up the index in the values dictionary
- let keys_col = dict_col.keys();
- let values_index = keys_col.value(row).to_usize().ok_or_else(|| {
- DataFusionError::Internal(format!(
- "Can not convert index to usize in dictionary of type creating group by value {:?}",
- keys_col.data_type()
- ))
- })?;
-
- create_key_for_col(dict_col.values(), values_index, vec)
-}
-
-/// Appends a sequence of [u8] bytes for the value in `col[row]` to
-/// `vec` to be used as a key into the hash map.
-///
-/// NOTE: This function does not check col.is_valid(). Caller must do so
-fn create_key_for_col(col: &ArrayRef, row: usize, vec: &mut Vec<u8>) -> Result<()> {
- match col.data_type() {
- DataType::Boolean => {
- let array = col.as_any().downcast_ref::<BooleanArray>().unwrap();
- vec.extend_from_slice(&[array.value(row) as u8]);
- }
- DataType::Float32 => {
- let array = col.as_any().downcast_ref::<Float32Array>().unwrap();
- vec.extend_from_slice(&array.value(row).to_le_bytes());
- }
- DataType::Float64 => {
- let array = col.as_any().downcast_ref::<Float64Array>().unwrap();
- vec.extend_from_slice(&array.value(row).to_le_bytes());
- }
- DataType::UInt8 => {
- let array = col.as_any().downcast_ref::<UInt8Array>().unwrap();
- vec.extend_from_slice(&array.value(row).to_le_bytes());
- }
- DataType::UInt16 => {
- let array = col.as_any().downcast_ref::<UInt16Array>().unwrap();
- vec.extend_from_slice(&array.value(row).to_le_bytes());
- }
- DataType::UInt32 => {
- let array = col.as_any().downcast_ref::<UInt32Array>().unwrap();
- vec.extend_from_slice(&array.value(row).to_le_bytes());
- }
- DataType::UInt64 => {
- let array = col.as_any().downcast_ref::<UInt64Array>().unwrap();
- vec.extend_from_slice(&array.value(row).to_le_bytes());
- }
- DataType::Int8 => {
- let array = col.as_any().downcast_ref::<Int8Array>().unwrap();
- vec.extend_from_slice(&array.value(row).to_le_bytes());
- }
- DataType::Int16 => {
- let array = col.as_any().downcast_ref::<Int16Array>().unwrap();
- vec.extend(array.value(row).to_le_bytes().iter());
- }
- DataType::Int32 => {
- let array = col.as_any().downcast_ref::<Int32Array>().unwrap();
- vec.extend_from_slice(&array.value(row).to_le_bytes());
- }
- DataType::Int64 => {
- let array = col.as_any().downcast_ref::<Int64Array>().unwrap();
- vec.extend_from_slice(&array.value(row).to_le_bytes());
- }
- DataType::Timestamp(TimeUnit::Millisecond, None) => {
- let array = col
- .as_any()
- .downcast_ref::<TimestampMillisecondArray>()
- .unwrap();
- vec.extend_from_slice(&array.value(row).to_le_bytes());
- }
- DataType::Timestamp(TimeUnit::Microsecond, None) => {
- let array = col
- .as_any()
- .downcast_ref::<TimestampMicrosecondArray>()
- .unwrap();
- vec.extend_from_slice(&array.value(row).to_le_bytes());
- }
- DataType::Timestamp(TimeUnit::Nanosecond, None) => {
- let array = col
- .as_any()
- .downcast_ref::<TimestampNanosecondArray>()
- .unwrap();
- vec.extend_from_slice(&array.value(row).to_le_bytes());
- }
- DataType::Utf8 => {
- let array = col.as_any().downcast_ref::<StringArray>().unwrap();
- let value = array.value(row);
- // store the size
- vec.extend_from_slice(&value.len().to_le_bytes());
- // store the string value
- vec.extend_from_slice(value.as_bytes());
- }
- DataType::LargeUtf8 => {
- let array = col.as_any().downcast_ref::<LargeStringArray>().unwrap();
- let value = array.value(row);
- // store the size
- vec.extend_from_slice(&value.len().to_le_bytes());
- // store the string value
- vec.extend_from_slice(value.as_bytes());
- }
- DataType::Date32 => {
- let array = col.as_any().downcast_ref::<Date32Array>().unwrap();
- vec.extend_from_slice(&array.value(row).to_le_bytes());
- }
- DataType::Dictionary(index_type, _) => match **index_type {
- DataType::Int8 => {
- dictionary_create_key_for_col::<Int8Type>(col, row, vec)?;
- }
- DataType::Int16 => {
- dictionary_create_key_for_col::<Int16Type>(col, row, vec)?;
- }
- DataType::Int32 => {
- dictionary_create_key_for_col::<Int32Type>(col, row, vec)?;
- }
- DataType::Int64 => {
- dictionary_create_key_for_col::<Int64Type>(col, row, vec)?;
- }
- DataType::UInt8 => {
- dictionary_create_key_for_col::<UInt8Type>(col, row, vec)?;
- }
- DataType::UInt16 => {
- dictionary_create_key_for_col::<UInt16Type>(col, row, vec)?;
- }
- DataType::UInt32 => {
- dictionary_create_key_for_col::<UInt32Type>(col, row, vec)?;
- }
- DataType::UInt64 => {
- dictionary_create_key_for_col::<UInt64Type>(col, row, vec)?;
- }
- _ => {
- return Err(DataFusionError::Internal(format!(
- "Unsupported GROUP BY type (dictionary index type not supported creating key) {}",
- col.data_type(),
- )))
- }
- },
- _ => {
- // This is internal because we should have caught this before.
- return Err(DataFusionError::Internal(format!(
- "Unsupported GROUP BY type creating key {}",
- col.data_type(),
- )));
- }
- }
- Ok(())
-}
-
-/// Create a key `Vec<u8>` that is used as key for the hashmap
-///
-/// This looks like
-/// [null_byte][col_value_bytes][null_byte][col_value_bytes]
-///
-/// Note that relatively uncommon patterns (e.g. not 0x00) are chosen
-/// for the null_byte to make debugging easier. The actual values are
-/// arbitrary.
-///
-/// For a NULL value in a column, the key looks like
-/// [0xFE]
-///
-/// For a Non-NULL value in a column, this looks like:
-/// [0xFF][byte representation of column value]
-///
-/// Example of a key with no NULL values:
-/// ```text
-/// 0xFF byte at the start of each column
-/// signifies the value is non-null
-/// │
-///
-/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ┐
-///
-/// │ string len │ 0x1234
-/// { ▼ (as usize le) "foo" ▼(as u16 le)
-/// k1: "foo" ╔ ═┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──╦ ═┌──┬──┐
-/// k2: 0x1234u16 FF║03│00│00│00│00│00│00│00│"f│"o│"o│FF║34│12│
-/// } ╚ ═└──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──╩ ═└──┴──┘
-/// 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14
-/// ```
-///
-/// Example of a key with NULL values:
-///
-///```text
-/// 0xFE byte at the start of k1 column
-/// ┌ ─ signifies the value is NULL
-///
-/// └ ┐
-/// 0x1234
-/// { ▼ (as u16 le)
-/// k1: NULL ╔ ═╔ ═┌──┬──┐
-/// k2: 0x1234u16 FE║FF║12│34│
-/// } ╚ ═╚ ═└──┴──┘
-/// 0 1 2 3
-///```
-pub(crate) fn create_key(
- group_by_keys: &[ArrayRef],
- row: usize,
- vec: &mut Vec<u8>,
-) -> Result<()> {
- vec.clear();
- for col in group_by_keys {
- if !col.is_valid(row) {
- vec.push(0xFE);
- } else {
- vec.push(0xFF);
- create_key_for_col(col, row, vec)?
- }
- }
- Ok(())
+ Ok(accumulators)
}
async fn compute_grouped_hash_aggregate(
@@ -720,11 +497,7 @@ async fn compute_grouped_hash_aggregate(
aggregate_expressions(&aggr_expr, &mode, group_expr.len())
.map_err(DataFusionError::into_arrow_external_error)?;
- // mapping key -> (set of accumulators, indices of the key in the batch)
- // * the indexes are updated at each row
- // * the accumulators are updated at the end of each batch
- // * the indexes are `clear`ed at the end of each batch
- //let mut accumulators: Accumulators = FnvHashMap::default();
+ let random_state = RandomState::new();
// iterate over all input batches and update the accumulators
let mut accumulators = Accumulators::default();
@@ -732,6 +505,7 @@ async fn compute_grouped_hash_aggregate(
let batch = batch?;
accumulators = group_aggregate_batch(
&mode,
+ &random_state,
&group_expr,
&aggr_expr,
batch,
@@ -779,8 +553,47 @@ impl GroupedHashAggregateStream {
}
type AccumulatorItem = Box<dyn Accumulator>;
-type Accumulators =
- HashMap<Vec<u8>, (Box<[ScalarValue]>, Vec<AccumulatorItem>, Vec<u32>), RandomState>;
+
+/// The state that is built for each output group.
+#[derive(Debug)]
+struct GroupState {
+ /// The actual group by values, one for each group column
+ group_by_values: Box<[ScalarValue]>,
+
+ // Accumulator state, one for each aggregate
+ accumulator_set: Vec<AccumulatorItem>,
+
+ /// scratch space used to collect indices for input rows in a
+ /// bach that have values to aggregate. Reset on each batch
+ indices: Vec<u32>,
+}
+
+/// The state of all the groups
+#[derive(Default)]
+struct Accumulators {
+ /// Logically maps group values to an index in `group_states`
+ ///
+ /// Uses the raw API of hashbrown to avoid actually storing the
+ /// keys in the table
+ ///
+ /// keys: u64 hashes of the GroupValue
+ /// values: (hash, index into `group_states`)
+ map: RawTable<(u64, usize)>,
+
+ /// State for each group
+ group_states: Vec<GroupState>,
+}
+
+impl std::fmt::Debug for Accumulators {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ // hashes are not store inline, so could only get values
+ let map_string = "RawTable";
+ f.debug_struct("Accumulators")
+ .field("map", &map_string)
+ .field("group_states", &self.group_states)
+ .finish()
+ }
+}
impl Stream for GroupedHashAggregateStream {
type Item = ArrowResult<RecordBatch>;
@@ -903,6 +716,7 @@ pin_project! {
}
}
+/// Special case aggregate with no groups
async fn compute_hash_aggregate(
mode: AggregateMode,
schema: SchemaRef,
@@ -1031,10 +845,10 @@ fn create_batch_from_map(
num_group_expr: usize,
output_schema: &Schema,
) -> ArrowResult<RecordBatch> {
- if accumulators.is_empty() {
+ if accumulators.group_states.is_empty() {
return Ok(RecordBatch::new_empty(Arc::new(output_schema.to_owned())));
}
- let (_, (_, accs, _)) = accumulators.iter().next().unwrap();
+ let accs = &accumulators.group_states[0].accumulator_set;
let mut acc_data_types: Vec<usize> = vec![];
// Calculate number/shape of state arrays
@@ -1056,8 +870,9 @@ fn create_batch_from_map(
.map(|i| {
ScalarValue::iter_to_array(
accumulators
- .into_iter()
- .map(|(_, (group_by_values, _, _))| group_by_values[i].clone()),
+ .group_states
+ .iter()
+ .map(|group_state| group_state.group_by_values[i].clone()),
)
})
.collect::<Result<Vec<_>>>()
@@ -1068,20 +883,22 @@ fn create_batch_from_map(
for y in 0..state_len {
match mode {
AggregateMode::Partial => {
- let res = ScalarValue::iter_to_array(accumulators.into_iter().map(
- |(_, (_, accumulator, _))| {
- let x = accumulator[x].state().unwrap();
+ let res = ScalarValue::iter_to_array(
+ accumulators.group_states.iter().map(|group_state| {
+ let x = group_state.accumulator_set[x].state().unwrap();
x[y].clone()
- },
- ))
+ }),
+ )
.map_err(DataFusionError::into_arrow_external_error)?;
columns.push(res);
}
AggregateMode::Final | AggregateMode::FinalPartitioned => {
- let res = ScalarValue::iter_to_array(accumulators.into_iter().map(
- |(_, (_, accumulator, _))| accumulator[x].evaluate().unwrap(),
- ))
+ let res = ScalarValue::iter_to_array(
+ accumulators.group_states.iter().map(|group_state| {
+ group_state.accumulator_set[x].evaluate().unwrap()
+ }),
+ )
.map_err(DataFusionError::into_arrow_external_error)?;
columns.push(res);
}
@@ -1140,28 +957,11 @@ fn finalize_aggregation(
}
}
-/// Extract the value in `col[row]` as a GroupByScalar
-fn create_group_by_value(col: &ArrayRef, row: usize) -> Result<ScalarValue> {
- ScalarValue::try_from_array(col, row)
-}
-
-/// Extract the values in `group_by_keys` arrow arrays into the target vector
-/// as GroupByScalar values
-pub(crate) fn create_group_by_values(
- group_by_keys: &[ArrayRef],
- row: usize,
- vec: &mut Box<[ScalarValue]>,
-) -> Result<()> {
- for (i, col) in group_by_keys.iter().enumerate() {
- vec[i] = create_group_by_value(col, row)?
- }
- Ok(())
-}
-
#[cfg(test)]
mod tests {
- use arrow::array::Float64Array;
+ use arrow::array::{Float64Array, UInt32Array};
+ use arrow::datatypes::DataType;
use super::*;
use crate::physical_plan::expressions::{col, Avg};