You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/01/06 21:03:17 UTC
[arrow-datafusion] branch master updated: feat: use arrow row format for hash-group-by (#4830)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new bc6a2dc68 feat: use arrow row format for hash-group-by (#4830)
bc6a2dc68 is described below
commit bc6a2dc68ca2e8538c9759b919fa2511c6f3f0d8
Author: Marco Neumann <ma...@crepererum.net>
AuthorDate: Fri Jan 6 22:03:12 2023 +0100
feat: use arrow row format for hash-group-by (#4830)
For #2723.
This has two effects:
- **wider feature support:** We now use the V2 aggregator for all
group-column types. The arrow row format support is sufficient for
that. V1 will only be used if the aggregator itself doesn't support V2
(and these are quite a few at the moment). We'll improve on that front
in follow-up PRs.
- **more speed:** Turns out the arrow row format is also faster (see
below).
Perf results (mind the noise in the benchmarks that are actually not
even touched by this code change):
```text
❯ cargo bench -p datafusion --bench aggregate_query_sql -- --baseline issue2723a-pre
...
Running benches/aggregate_query_sql.rs (target/release/deps/aggregate_query_sql-fdbe5671f9c3019b)
aggregate_query_no_group_by 15 12
time: [779.28 µs 782.77 µs 786.28 µs]
change: [+2.1375% +2.7672% +3.4171%] (p = 0.00 < 0.05)
Performance has regressed.
Found 1 outliers among 100 measurements (1.00%)
1 (1.00%) high mild
aggregate_query_no_group_by_min_max_f64
time: [712.96 µs 715.90 µs 719.14 µs]
change: [+0.8379% +1.7648% +2.6345%] (p = 0.00 < 0.05)
Change within noise threshold.
Found 10 outliers among 100 measurements (10.00%)
3 (3.00%) low mild
6 (6.00%) high mild
1 (1.00%) high severe
Benchmarking aggregate_query_no_group_by_count_distinct_wide: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 8.7s, enable flat sampling, or reduce sample count to 50.
aggregate_query_no_group_by_count_distinct_wide
time: [1.7297 ms 1.7399 ms 1.7503 ms]
change: [-34.647% -33.908% -33.165%] (p = 0.00 < 0.05)
Performance has improved.
Found 5 outliers among 100 measurements (5.00%)
5 (5.00%) high mild
Benchmarking aggregate_query_no_group_by_count_distinct_narrow: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.7s, enable flat sampling, or reduce sample count to 60.
aggregate_query_no_group_by_count_distinct_narrow
time: [1.0984 ms 1.1045 ms 1.1115 ms]
change: [-38.581% -38.076% -37.569%] (p = 0.00 < 0.05)
Performance has improved.
Found 6 outliers among 100 measurements (6.00%)
1 (1.00%) low mild
5 (5.00%) high mild
Benchmarking aggregate_query_group_by: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 9.1s, enable flat sampling, or reduce sample count to 50.
aggregate_query_group_by
time: [1.7810 ms 1.7925 ms 1.8057 ms]
change: [-25.252% -24.127% -22.737%] (p = 0.00 < 0.05)
Performance has improved.
Found 9 outliers among 100 measurements (9.00%)
2 (2.00%) low mild
5 (5.00%) high mild
2 (2.00%) high severe
Benchmarking aggregate_query_group_by_with_filter: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 6.1s, enable flat sampling, or reduce sample count to 60.
aggregate_query_group_by_with_filter
time: [1.2068 ms 1.2119 ms 1.2176 ms]
change: [+2.2847% +3.0857% +3.8789%] (p = 0.00 < 0.05)
Performance has regressed.
Found 10 outliers among 100 measurements (10.00%)
1 (1.00%) low mild
7 (7.00%) high mild
2 (2.00%) high severe
Benchmarking aggregate_query_group_by_u64 15 12: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 8.7s, enable flat sampling, or reduce sample count to 50.
aggregate_query_group_by_u64 15 12
time: [1.6762 ms 1.6848 ms 1.6942 ms]
change: [-29.598% -28.603% -27.400%] (p = 0.00 < 0.05)
Performance has improved.
Found 8 outliers among 100 measurements (8.00%)
1 (1.00%) low mild
1 (1.00%) high mild
6 (6.00%) high severe
Benchmarking aggregate_query_group_by_with_filter_u64 15 12: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 6.1s, enable flat sampling, or reduce sample count to 60.
aggregate_query_group_by_with_filter_u64 15 12
time: [1.1969 ms 1.2008 ms 1.2049 ms]
change: [+1.3015% +2.1513% +3.0016%] (p = 0.00 < 0.05)
Performance has regressed.
Found 6 outliers among 100 measurements (6.00%)
1 (1.00%) low severe
2 (2.00%) high mild
3 (3.00%) high severe
aggregate_query_group_by_u64_multiple_keys
time: [14.797 ms 15.112 ms 15.427 ms]
change: [-12.072% -8.7274% -5.3392%] (p = 0.00 < 0.05)
Performance has improved.
Found 3 outliers among 100 measurements (3.00%)
3 (3.00%) high mild
aggregate_query_approx_percentile_cont_on_u64
time: [4.1278 ms 4.1687 ms 4.2098 ms]
change: [+0.4851% +1.9525% +3.3676%] (p = 0.01 < 0.05)
Change within noise threshold.
Found 2 outliers among 100 measurements (2.00%)
1 (1.00%) low mild
1 (1.00%) high mild
aggregate_query_approx_percentile_cont_on_f32
time: [3.4694 ms 3.4967 ms 3.5245 ms]
change: [-1.5467% -0.4432% +0.6609%] (p = 0.43 > 0.05)
No change in performance detected.
Found 1 outliers among 100 measurements (1.00%)
1 (1.00%) high mild
```
---
.../core/src/physical_plan/aggregates/mod.rs | 5 +-
.../core/src/physical_plan/aggregates/row_hash.rs | 57 ++++++++++------------
datafusion/physical-expr/src/hash_utils.rs | 33 +++++++++++++
3 files changed, 61 insertions(+), 34 deletions(-)
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs
index cb24193e3..07f3563bb 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -52,7 +52,6 @@ use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulator;
use datafusion_physical_expr::equivalence::project_equivalence_properties;
pub use datafusion_physical_expr::expressions::create_aggregate_expr;
use datafusion_physical_expr::normalize_out_expr_with_alias_schema;
-use datafusion_row::{row_supported, RowType};
/// Hash aggregate modes
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
@@ -273,9 +272,7 @@ impl AggregateExec {
}
fn row_aggregate_supported(&self) -> bool {
- let group_schema = group_schema(&self.schema, self.group_by.expr.len());
- row_supported(&group_schema, RowType::Compact)
- && accumulator_v2_supported(&self.aggr_expr)
+ accumulator_v2_supported(&self.aggr_expr)
}
fn execute_typed(
diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
index 94654d502..b27723dbd 100644
--- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs
+++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
@@ -22,6 +22,8 @@ use std::task::{Context, Poll};
use std::vec;
use ahash::RandomState;
+use arrow::row::{OwnedRow, RowConverter, SortField};
+use datafusion_physical_expr::hash_utils::create_row_hashes_v2;
use futures::stream::BoxStream;
use futures::stream::{Stream, StreamExt};
@@ -32,7 +34,6 @@ use crate::physical_plan::aggregates::{
evaluate_group_by, evaluate_many, group_schema, AccumulatorItemV2, AggregateMode,
PhysicalGroupBy,
};
-use crate::physical_plan::hash_utils::create_row_hashes;
use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
@@ -50,7 +51,6 @@ use datafusion_common::ScalarValue;
use datafusion_row::accessor::RowAccessor;
use datafusion_row::layout::RowLayout;
use datafusion_row::reader::{read_row, RowReader};
-use datafusion_row::writer::{write_row, RowWriter};
use datafusion_row::{MutableRecordBatch, RowType};
use hashbrown::raw::RawTable;
@@ -90,7 +90,7 @@ struct GroupedHashAggregateStreamV2Inner {
group_by: PhysicalGroupBy,
accumulators: Vec<AccumulatorItemV2>,
- group_schema: SchemaRef,
+ row_converter: RowConverter,
aggr_schema: SchemaRef,
aggr_layout: Arc<RowLayout>,
@@ -136,6 +136,13 @@ impl GroupedHashAggregateStreamV2 {
let accumulators = aggregates::create_accumulators_v2(&aggr_expr)?;
let group_schema = group_schema(&schema, group_by.expr.len());
+ let row_converter = RowConverter::new(
+ group_schema
+ .fields()
+ .iter()
+ .map(|f| SortField::new(f.data_type().clone()))
+ .collect(),
+ )?;
let aggr_schema = aggr_state_schema(&aggr_expr)?;
let aggr_layout = Arc::new(RowLayout::new(&aggr_schema, RowType::WordAligned));
@@ -157,7 +164,7 @@ impl GroupedHashAggregateStreamV2 {
input,
group_by,
accumulators,
- group_schema,
+ row_converter,
aggr_schema,
aggr_layout,
baseline_metrics,
@@ -181,7 +188,7 @@ impl GroupedHashAggregateStreamV2 {
&this.random_state,
&this.group_by,
&mut this.accumulators,
- &this.group_schema,
+ &mut this.row_converter,
this.aggr_layout.clone(),
batch,
&mut this.aggr_state,
@@ -205,7 +212,7 @@ impl GroupedHashAggregateStreamV2 {
let timer = this.baseline_metrics.elapsed_compute().timer();
let result = create_batch_from_map(
&this.mode,
- &this.group_schema,
+ &this.row_converter,
&this.aggr_schema,
this.batch_size,
this.row_group_skip_position,
@@ -270,7 +277,7 @@ fn group_aggregate_batch(
random_state: &RandomState,
grouping_set: &PhysicalGroupBy,
accumulators: &mut [AccumulatorItemV2],
- group_schema: &Schema,
+ row_converter: &mut RowConverter,
state_layout: Arc<RowLayout>,
batch: RecordBatch,
aggr_state: &mut AggregationState,
@@ -283,9 +290,10 @@ fn group_aggregate_batch(
map, group_states, ..
} = aggr_state;
let mut allocated = 0usize;
+ let row_converter_size_pre = row_converter.size();
for group_values in grouping_by_values {
- let group_rows: Vec<Vec<u8>> = create_group_rows(group_values, group_schema);
+ let group_rows = row_converter.convert_columns(&group_values)?;
// evaluate the aggregation expressions.
// We could evaluate them after the `take`, but since we need to evaluate all
@@ -301,7 +309,7 @@ fn group_aggregate_batch(
// 1.1 Calculate the group keys for the group values
let mut batch_hashes = vec![0; batch.num_rows()];
- create_row_hashes(&group_rows, random_state, &mut batch_hashes)?;
+ create_row_hashes_v2(&group_rows, random_state, &mut batch_hashes)?;
for (row, hash) in batch_hashes.into_iter().enumerate() {
let entry = map.get_mut(hash, |(_hash, group_idx)| {
@@ -309,7 +317,7 @@ fn group_aggregate_batch(
// actually the same key value as the group in
// existing_idx (aka group_values @ row)
let group_state = &group_states[*group_idx];
- group_rows[row] == group_state.group_by_values
+ group_rows.row(row) == group_state.group_by_values.row()
});
match entry {
@@ -330,7 +338,7 @@ fn group_aggregate_batch(
None => {
// Add new entry to group_states and save newly created index
let group_state = RowGroupState {
- group_by_values: group_rows[row].clone(),
+ group_by_values: group_rows.row(row).owned(),
aggregation_buffer: vec![0; state_layout.fixed_part_width()],
indices: vec![row as u32], // 1.3
};
@@ -339,7 +347,7 @@ fn group_aggregate_batch(
// NOTE: do NOT include the `RowGroupState` struct size in here because this is captured by
// `group_states` (see allocation down below)
allocated += (std::mem::size_of::<u8>()
- * group_state.group_by_values.capacity())
+ * group_state.group_by_values.as_ref().len())
+ (std::mem::size_of::<u8>()
* group_state.aggregation_buffer.capacity())
+ (std::mem::size_of::<u32>() * group_state.indices.capacity());
@@ -438,14 +446,16 @@ fn group_aggregate_batch(
})?;
}
+ allocated += row_converter.size().saturating_sub(row_converter_size_pre);
+
Ok(allocated)
}
/// The state that is built for each output group.
#[derive(Debug)]
struct RowGroupState {
- /// The actual group by values, stored sequentially
- group_by_values: Vec<u8>,
+ // Group key.
+ group_by_values: OwnedRow,
// Accumulator state, stored sequentially
aggregation_buffer: Vec<u8>,
@@ -483,23 +493,11 @@ impl std::fmt::Debug for AggregationState {
}
}
-/// Create grouping rows
-fn create_group_rows(arrays: Vec<ArrayRef>, schema: &Schema) -> Vec<Vec<u8>> {
- let mut writer = RowWriter::new(schema, RowType::Compact);
- let mut results = vec![];
- for cur_row in 0..arrays[0].len() {
- write_row(&mut writer, cur_row, schema, &arrays);
- results.push(writer.get_row().to_vec());
- writer.reset()
- }
- results
-}
-
/// Create a RecordBatch with all group keys and accumulator' states or values.
#[allow(clippy::too_many_arguments)]
fn create_batch_from_map(
mode: &AggregateMode,
- group_schema: &Schema,
+ converter: &RowConverter,
aggr_schema: &Schema,
batch_size: usize,
skip_items: usize,
@@ -524,11 +522,10 @@ fn create_batch_from_map(
.iter()
.skip(skip_items)
.take(batch_size)
- .map(|gs| (gs.group_by_values.clone(), gs.aggregation_buffer.clone()))
+ .map(|gs| (gs.group_by_values.row(), gs.aggregation_buffer.clone()))
.unzip();
- let mut columns: Vec<ArrayRef> =
- read_as_batch(&group_buffers, group_schema, RowType::Compact);
+ let mut columns: Vec<ArrayRef> = converter.convert_rows(group_buffers)?;
match mode {
AggregateMode::Partial => columns.extend(read_as_batch(
diff --git a/datafusion/physical-expr/src/hash_utils.rs b/datafusion/physical-expr/src/hash_utils.rs
index c687eb80e..ab2377da9 100644
--- a/datafusion/physical-expr/src/hash_utils.rs
+++ b/datafusion/physical-expr/src/hash_utils.rs
@@ -20,6 +20,7 @@
use ahash::RandomState;
use arrow::array::*;
use arrow::datatypes::*;
+use arrow::row::Rows;
use arrow::{downcast_dictionary_array, downcast_primitive_array};
use arrow_buffer::i256;
use datafusion_common::{
@@ -249,6 +250,38 @@ pub fn create_hashes<'a>(
Ok(hashes_buffer)
}
+/// Test version of `create_row_hashes_v2` that produces the same value for
+/// all hashes (to test collisions)
+///
+/// See comments on `hashes_buffer` for more details
+#[cfg(feature = "force_hash_collisions")]
+pub fn create_row_hashes_v2<'a>(
+ _rows: &Rows,
+ _random_state: &RandomState,
+ hashes_buffer: &'a mut Vec<u64>,
+) -> Result<&'a mut Vec<u64>> {
+ for hash in hashes_buffer.iter_mut() {
+ *hash = 0
+ }
+ Ok(hashes_buffer)
+}
+
+/// Creates hash values for every row, based on their raw bytes.
+#[cfg(not(feature = "force_hash_collisions"))]
+pub fn create_row_hashes_v2<'a>(
+ rows: &Rows,
+ random_state: &RandomState,
+ hashes_buffer: &'a mut Vec<u64>,
+) -> Result<&'a mut Vec<u64>> {
+ for hash in hashes_buffer.iter_mut() {
+ *hash = 0
+ }
+ for (i, hash) in hashes_buffer.iter_mut().enumerate() {
+ *hash = random_state.hash_one(rows.row(i));
+ }
+ Ok(hashes_buffer)
+}
+
#[cfg(test)]
mod tests {
use crate::from_slice::FromSlice;