You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/01/06 21:03:17 UTC

[arrow-datafusion] branch master updated: feat: use arrow row format for hash-group-by (#4830)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new bc6a2dc68 feat: use arrow row format for hash-group-by (#4830)
bc6a2dc68 is described below

commit bc6a2dc68ca2e8538c9759b919fa2511c6f3f0d8
Author: Marco Neumann <ma...@crepererum.net>
AuthorDate: Fri Jan 6 22:03:12 2023 +0100

    feat: use arrow row format for hash-group-by (#4830)
    
    For #2723.
    
    This has two effects:
    
    - **wider feature support:** We now use the V2 aggregator for all
      group-column types. The arrow row format support is sufficient for
      that. V1 will only be used if the aggregator itself doesn't support V2
      (and these are quite a few at the moment). We'll improve on that front
      in follow-up PRs.
    - **more speed:** Turns out the arrow row format is also faster (see
      below).
    
    Perf results (mind the noise in the benchmarks that are actually not
    even touched by this code change):
    
    ```text
    ❯ cargo bench -p datafusion --bench aggregate_query_sql -- --baseline issue2723a-pre
    ...
         Running benches/aggregate_query_sql.rs (target/release/deps/aggregate_query_sql-fdbe5671f9c3019b)
    aggregate_query_no_group_by 15 12
                            time:   [779.28 µs 782.77 µs 786.28 µs]
                            change: [+2.1375% +2.7672% +3.4171%] (p = 0.00 < 0.05)
                            Performance has regressed.
    Found 1 outliers among 100 measurements (1.00%)
      1 (1.00%) high mild
    
    aggregate_query_no_group_by_min_max_f64
                            time:   [712.96 µs 715.90 µs 719.14 µs]
                            change: [+0.8379% +1.7648% +2.6345%] (p = 0.00 < 0.05)
                            Change within noise threshold.
    Found 10 outliers among 100 measurements (10.00%)
      3 (3.00%) low mild
      6 (6.00%) high mild
      1 (1.00%) high severe
    
    Benchmarking aggregate_query_no_group_by_count_distinct_wide: Warming up for 3.0000 s
    Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 8.7s, enable flat sampling, or reduce sample count to 50.
    aggregate_query_no_group_by_count_distinct_wide
                            time:   [1.7297 ms 1.7399 ms 1.7503 ms]
                            change: [-34.647% -33.908% -33.165%] (p = 0.00 < 0.05)
                            Performance has improved.
    Found 5 outliers among 100 measurements (5.00%)
      5 (5.00%) high mild
    
    Benchmarking aggregate_query_no_group_by_count_distinct_narrow: Warming up for 3.0000 s
    Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.7s, enable flat sampling, or reduce sample count to 60.
    aggregate_query_no_group_by_count_distinct_narrow
                            time:   [1.0984 ms 1.1045 ms 1.1115 ms]
                            change: [-38.581% -38.076% -37.569%] (p = 0.00 < 0.05)
                            Performance has improved.
    Found 6 outliers among 100 measurements (6.00%)
      1 (1.00%) low mild
      5 (5.00%) high mild
    
    Benchmarking aggregate_query_group_by: Warming up for 3.0000 s
    Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 9.1s, enable flat sampling, or reduce sample count to 50.
    aggregate_query_group_by
                            time:   [1.7810 ms 1.7925 ms 1.8057 ms]
                            change: [-25.252% -24.127% -22.737%] (p = 0.00 < 0.05)
                            Performance has improved.
    Found 9 outliers among 100 measurements (9.00%)
      2 (2.00%) low mild
      5 (5.00%) high mild
      2 (2.00%) high severe
    
    Benchmarking aggregate_query_group_by_with_filter: Warming up for 3.0000 s
    Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 6.1s, enable flat sampling, or reduce sample count to 60.
    aggregate_query_group_by_with_filter
                            time:   [1.2068 ms 1.2119 ms 1.2176 ms]
                            change: [+2.2847% +3.0857% +3.8789%] (p = 0.00 < 0.05)
                            Performance has regressed.
    Found 10 outliers among 100 measurements (10.00%)
      1 (1.00%) low mild
      7 (7.00%) high mild
      2 (2.00%) high severe
    
    Benchmarking aggregate_query_group_by_u64 15 12: Warming up for 3.0000 s
    Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 8.7s, enable flat sampling, or reduce sample count to 50.
    aggregate_query_group_by_u64 15 12
                            time:   [1.6762 ms 1.6848 ms 1.6942 ms]
                            change: [-29.598% -28.603% -27.400%] (p = 0.00 < 0.05)
                            Performance has improved.
    Found 8 outliers among 100 measurements (8.00%)
      1 (1.00%) low mild
      1 (1.00%) high mild
      6 (6.00%) high severe
    
    Benchmarking aggregate_query_group_by_with_filter_u64 15 12: Warming up for 3.0000 s
    Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 6.1s, enable flat sampling, or reduce sample count to 60.
    aggregate_query_group_by_with_filter_u64 15 12
                            time:   [1.1969 ms 1.2008 ms 1.2049 ms]
                            change: [+1.3015% +2.1513% +3.0016%] (p = 0.00 < 0.05)
                            Performance has regressed.
    Found 6 outliers among 100 measurements (6.00%)
      1 (1.00%) low severe
      2 (2.00%) high mild
      3 (3.00%) high severe
    
    aggregate_query_group_by_u64_multiple_keys
                            time:   [14.797 ms 15.112 ms 15.427 ms]
                            change: [-12.072% -8.7274% -5.3392%] (p = 0.00 < 0.05)
                            Performance has improved.
    Found 3 outliers among 100 measurements (3.00%)
      3 (3.00%) high mild
    
    aggregate_query_approx_percentile_cont_on_u64
                            time:   [4.1278 ms 4.1687 ms 4.2098 ms]
                            change: [+0.4851% +1.9525% +3.3676%] (p = 0.01 < 0.05)
                            Change within noise threshold.
    Found 2 outliers among 100 measurements (2.00%)
      1 (1.00%) low mild
      1 (1.00%) high mild
    
    aggregate_query_approx_percentile_cont_on_f32
                            time:   [3.4694 ms 3.4967 ms 3.5245 ms]
                            change: [-1.5467% -0.4432% +0.6609%] (p = 0.43 > 0.05)
                            No change in performance detected.
    Found 1 outliers among 100 measurements (1.00%)
      1 (1.00%) high mild
    ```
---
 .../core/src/physical_plan/aggregates/mod.rs       |  5 +-
 .../core/src/physical_plan/aggregates/row_hash.rs  | 57 ++++++++++------------
 datafusion/physical-expr/src/hash_utils.rs         | 33 +++++++++++++
 3 files changed, 61 insertions(+), 34 deletions(-)

diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs
index cb24193e3..07f3563bb 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -52,7 +52,6 @@ use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulator;
 use datafusion_physical_expr::equivalence::project_equivalence_properties;
 pub use datafusion_physical_expr::expressions::create_aggregate_expr;
 use datafusion_physical_expr::normalize_out_expr_with_alias_schema;
-use datafusion_row::{row_supported, RowType};
 
 /// Hash aggregate modes
 #[derive(Debug, Copy, Clone, PartialEq, Eq)]
@@ -273,9 +272,7 @@ impl AggregateExec {
     }
 
     fn row_aggregate_supported(&self) -> bool {
-        let group_schema = group_schema(&self.schema, self.group_by.expr.len());
-        row_supported(&group_schema, RowType::Compact)
-            && accumulator_v2_supported(&self.aggr_expr)
+        accumulator_v2_supported(&self.aggr_expr)
     }
 
     fn execute_typed(
diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
index 94654d502..b27723dbd 100644
--- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs
+++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
@@ -22,6 +22,8 @@ use std::task::{Context, Poll};
 use std::vec;
 
 use ahash::RandomState;
+use arrow::row::{OwnedRow, RowConverter, SortField};
+use datafusion_physical_expr::hash_utils::create_row_hashes_v2;
 use futures::stream::BoxStream;
 use futures::stream::{Stream, StreamExt};
 
@@ -32,7 +34,6 @@ use crate::physical_plan::aggregates::{
     evaluate_group_by, evaluate_many, group_schema, AccumulatorItemV2, AggregateMode,
     PhysicalGroupBy,
 };
-use crate::physical_plan::hash_utils::create_row_hashes;
 use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
 use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
 use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
@@ -50,7 +51,6 @@ use datafusion_common::ScalarValue;
 use datafusion_row::accessor::RowAccessor;
 use datafusion_row::layout::RowLayout;
 use datafusion_row::reader::{read_row, RowReader};
-use datafusion_row::writer::{write_row, RowWriter};
 use datafusion_row::{MutableRecordBatch, RowType};
 use hashbrown::raw::RawTable;
 
@@ -90,7 +90,7 @@ struct GroupedHashAggregateStreamV2Inner {
     group_by: PhysicalGroupBy,
     accumulators: Vec<AccumulatorItemV2>,
 
-    group_schema: SchemaRef,
+    row_converter: RowConverter,
     aggr_schema: SchemaRef,
     aggr_layout: Arc<RowLayout>,
 
@@ -136,6 +136,13 @@ impl GroupedHashAggregateStreamV2 {
         let accumulators = aggregates::create_accumulators_v2(&aggr_expr)?;
 
         let group_schema = group_schema(&schema, group_by.expr.len());
+        let row_converter = RowConverter::new(
+            group_schema
+                .fields()
+                .iter()
+                .map(|f| SortField::new(f.data_type().clone()))
+                .collect(),
+        )?;
         let aggr_schema = aggr_state_schema(&aggr_expr)?;
 
         let aggr_layout = Arc::new(RowLayout::new(&aggr_schema, RowType::WordAligned));
@@ -157,7 +164,7 @@ impl GroupedHashAggregateStreamV2 {
             input,
             group_by,
             accumulators,
-            group_schema,
+            row_converter,
             aggr_schema,
             aggr_layout,
             baseline_metrics,
@@ -181,7 +188,7 @@ impl GroupedHashAggregateStreamV2 {
                                 &this.random_state,
                                 &this.group_by,
                                 &mut this.accumulators,
-                                &this.group_schema,
+                                &mut this.row_converter,
                                 this.aggr_layout.clone(),
                                 batch,
                                 &mut this.aggr_state,
@@ -205,7 +212,7 @@ impl GroupedHashAggregateStreamV2 {
                             let timer = this.baseline_metrics.elapsed_compute().timer();
                             let result = create_batch_from_map(
                                 &this.mode,
-                                &this.group_schema,
+                                &this.row_converter,
                                 &this.aggr_schema,
                                 this.batch_size,
                                 this.row_group_skip_position,
@@ -270,7 +277,7 @@ fn group_aggregate_batch(
     random_state: &RandomState,
     grouping_set: &PhysicalGroupBy,
     accumulators: &mut [AccumulatorItemV2],
-    group_schema: &Schema,
+    row_converter: &mut RowConverter,
     state_layout: Arc<RowLayout>,
     batch: RecordBatch,
     aggr_state: &mut AggregationState,
@@ -283,9 +290,10 @@ fn group_aggregate_batch(
         map, group_states, ..
     } = aggr_state;
     let mut allocated = 0usize;
+    let row_converter_size_pre = row_converter.size();
 
     for group_values in grouping_by_values {
-        let group_rows: Vec<Vec<u8>> = create_group_rows(group_values, group_schema);
+        let group_rows = row_converter.convert_columns(&group_values)?;
 
         // evaluate the aggregation expressions.
         // We could evaluate them after the `take`, but since we need to evaluate all
@@ -301,7 +309,7 @@ fn group_aggregate_batch(
 
         // 1.1 Calculate the group keys for the group values
         let mut batch_hashes = vec![0; batch.num_rows()];
-        create_row_hashes(&group_rows, random_state, &mut batch_hashes)?;
+        create_row_hashes_v2(&group_rows, random_state, &mut batch_hashes)?;
 
         for (row, hash) in batch_hashes.into_iter().enumerate() {
             let entry = map.get_mut(hash, |(_hash, group_idx)| {
@@ -309,7 +317,7 @@ fn group_aggregate_batch(
                 // actually the same key value as the group in
                 // existing_idx  (aka group_values @ row)
                 let group_state = &group_states[*group_idx];
-                group_rows[row] == group_state.group_by_values
+                group_rows.row(row) == group_state.group_by_values.row()
             });
 
             match entry {
@@ -330,7 +338,7 @@ fn group_aggregate_batch(
                 None => {
                     // Add new entry to group_states and save newly created index
                     let group_state = RowGroupState {
-                        group_by_values: group_rows[row].clone(),
+                        group_by_values: group_rows.row(row).owned(),
                         aggregation_buffer: vec![0; state_layout.fixed_part_width()],
                         indices: vec![row as u32], // 1.3
                     };
@@ -339,7 +347,7 @@ fn group_aggregate_batch(
                     // NOTE: do NOT include the `RowGroupState` struct size in here because this is captured by
                     // `group_states` (see allocation down below)
                     allocated += (std::mem::size_of::<u8>()
-                        * group_state.group_by_values.capacity())
+                        * group_state.group_by_values.as_ref().len())
                         + (std::mem::size_of::<u8>()
                             * group_state.aggregation_buffer.capacity())
                         + (std::mem::size_of::<u32>() * group_state.indices.capacity());
@@ -438,14 +446,16 @@ fn group_aggregate_batch(
             })?;
     }
 
+    allocated += row_converter.size().saturating_sub(row_converter_size_pre);
+
     Ok(allocated)
 }
 
 /// The state that is built for each output group.
 #[derive(Debug)]
 struct RowGroupState {
-    /// The actual group by values, stored sequentially
-    group_by_values: Vec<u8>,
+    // Group key.
+    group_by_values: OwnedRow,
 
     // Accumulator state, stored sequentially
     aggregation_buffer: Vec<u8>,
@@ -483,23 +493,11 @@ impl std::fmt::Debug for AggregationState {
     }
 }
 
-/// Create grouping rows
-fn create_group_rows(arrays: Vec<ArrayRef>, schema: &Schema) -> Vec<Vec<u8>> {
-    let mut writer = RowWriter::new(schema, RowType::Compact);
-    let mut results = vec![];
-    for cur_row in 0..arrays[0].len() {
-        write_row(&mut writer, cur_row, schema, &arrays);
-        results.push(writer.get_row().to_vec());
-        writer.reset()
-    }
-    results
-}
-
 /// Create a RecordBatch with all group keys and accumulator' states or values.
 #[allow(clippy::too_many_arguments)]
 fn create_batch_from_map(
     mode: &AggregateMode,
-    group_schema: &Schema,
+    converter: &RowConverter,
     aggr_schema: &Schema,
     batch_size: usize,
     skip_items: usize,
@@ -524,11 +522,10 @@ fn create_batch_from_map(
         .iter()
         .skip(skip_items)
         .take(batch_size)
-        .map(|gs| (gs.group_by_values.clone(), gs.aggregation_buffer.clone()))
+        .map(|gs| (gs.group_by_values.row(), gs.aggregation_buffer.clone()))
         .unzip();
 
-    let mut columns: Vec<ArrayRef> =
-        read_as_batch(&group_buffers, group_schema, RowType::Compact);
+    let mut columns: Vec<ArrayRef> = converter.convert_rows(group_buffers)?;
 
     match mode {
         AggregateMode::Partial => columns.extend(read_as_batch(
diff --git a/datafusion/physical-expr/src/hash_utils.rs b/datafusion/physical-expr/src/hash_utils.rs
index c687eb80e..ab2377da9 100644
--- a/datafusion/physical-expr/src/hash_utils.rs
+++ b/datafusion/physical-expr/src/hash_utils.rs
@@ -20,6 +20,7 @@
 use ahash::RandomState;
 use arrow::array::*;
 use arrow::datatypes::*;
+use arrow::row::Rows;
 use arrow::{downcast_dictionary_array, downcast_primitive_array};
 use arrow_buffer::i256;
 use datafusion_common::{
@@ -249,6 +250,38 @@ pub fn create_hashes<'a>(
     Ok(hashes_buffer)
 }
 
+/// Test version of `create_row_hashes_v2` that produces the same value for
+/// all hashes (to test collisions)
+///
+/// See comments on `hashes_buffer` for more details
+#[cfg(feature = "force_hash_collisions")]
+pub fn create_row_hashes_v2<'a>(
+    _rows: &Rows,
+    _random_state: &RandomState,
+    hashes_buffer: &'a mut Vec<u64>,
+) -> Result<&'a mut Vec<u64>> {
+    for hash in hashes_buffer.iter_mut() {
+        *hash = 0
+    }
+    Ok(hashes_buffer)
+}
+
+/// Creates hash values for every row, based on their raw bytes.
+#[cfg(not(feature = "force_hash_collisions"))]
+pub fn create_row_hashes_v2<'a>(
+    rows: &Rows,
+    random_state: &RandomState,
+    hashes_buffer: &'a mut Vec<u64>,
+) -> Result<&'a mut Vec<u64>> {
+    for hash in hashes_buffer.iter_mut() {
+        *hash = 0
+    }
+    for (i, hash) in hashes_buffer.iter_mut().enumerate() {
+        *hash = random_state.hash_one(rows.row(i));
+    }
+    Ok(hashes_buffer)
+}
+
 #[cfg(test)]
 mod tests {
     use crate::from_slice::FromSlice;