You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by nj...@apache.org on 2023/04/20 01:41:58 UTC

[arrow-datafusion] branch main updated: Row accumulator support update Scalar values (#6003)

This is an automated email from the ASF dual-hosted git repository.

nju_yaho pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 10b0eff7df Row accumulator support update Scalar values (#6003)
10b0eff7df is described below

commit 10b0eff7dfbe195a724285bc166b20240e8ebccb
Author: mingmwang <mi...@gmail.com>
AuthorDate: Thu Apr 20 09:41:51 2023 +0800

    Row accumulator support update Scalar values (#6003)
    
    * support update RowAccumulators using Scalar values
    
    * fix group by count multi exprs
    
    * refine hot path, avoid Vec creation
    
    * fix UT
    
    * resolve review comments
    
    * remove redundant null check
---
 datafusion/common/src/scalar.rs                    |   2 +-
 .../core/src/physical_plan/aggregates/row_hash.rs  | 146 ++++++++++++++++-----
 datafusion/core/tests/sql/aggregates.rs            |  51 +++++++
 datafusion/physical-expr/src/aggregate/average.rs  |  23 +++-
 datafusion/physical-expr/src/aggregate/count.rs    |  25 ++++
 datafusion/physical-expr/src/aggregate/min_max.rs  |  40 +++++-
 .../physical-expr/src/aggregate/row_accumulator.rs |  14 ++
 datafusion/physical-expr/src/aggregate/sum.rs      |  67 +++++++++-
 datafusion/row/src/accessor.rs                     |   3 +
 9 files changed, 330 insertions(+), 41 deletions(-)

diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index c22616883c..f313e662da 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -1443,7 +1443,7 @@ impl std::hash::Hash for ScalarValue {
 /// return a reference to the values array and the index into it for a
 /// dictionary array
 #[inline]
-fn get_dict_value<K: ArrowDictionaryKeyType>(
+pub fn get_dict_value<K: ArrowDictionaryKeyType>(
     array: &dyn Array,
     index: usize,
 ) -> (&ArrayRef, Option<usize>) {
diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
index d9e42e478d..bf1846ae98 100644
--- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs
+++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
@@ -31,6 +31,7 @@ use futures::stream::{Stream, StreamExt};
 
 use crate::execution::context::TaskContext;
 use crate::execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
+use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
 use crate::physical_plan::aggregates::{
     evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AccumulatorItem,
     AggregateMode, PhysicalGroupBy, RowAccumulatorItem,
@@ -38,9 +39,7 @@ use crate::physical_plan::aggregates::{
 use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
 use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
 use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
-
-use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
-use arrow::array::{new_null_array, Array, ArrayRef, PrimitiveArray, UInt32Builder};
+use arrow::array::*;
 use arrow::compute::{cast, filter};
 use arrow::datatypes::{DataType, Schema, UInt32Type};
 use arrow::{compute, datatypes::SchemaRef, record_batch::RecordBatch};
@@ -53,6 +52,7 @@ use datafusion_row::layout::RowLayout;
 use datafusion_row::reader::{read_row, RowReader};
 use datafusion_row::MutableRecordBatch;
 use hashbrown::raw::RawTable;
+use itertools::izip;
 
 /// Grouping aggregate with row-format aggregation states inside.
 ///
@@ -409,7 +409,7 @@ impl GroupedHashAggregateStream {
 
     // Update the accumulator results, according to row_aggr_state.
     #[allow(clippy::too_many_arguments)]
-    fn update_accumulators(
+    fn update_accumulators_using_batch(
         &mut self,
         groups_with_rows: &[usize],
         offsets: &[usize],
@@ -490,6 +490,55 @@ impl GroupedHashAggregateStream {
         Ok(())
     }
 
+    // Update the accumulator results, according to row_aggr_state.
+    fn update_accumulators_using_scalar(
+        &mut self,
+        groups_with_rows: &[usize],
+        row_values: &[Vec<ArrayRef>],
+        row_filter_values: &[Option<ArrayRef>],
+    ) -> Result<()> {
+        let filter_bool_array = row_filter_values
+            .iter()
+            .map(|filter_opt| match filter_opt {
+                Some(f) => Ok(Some(as_boolean_array(f)?)),
+                None => Ok(None),
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        for group_idx in groups_with_rows {
+            let group_state = &mut self.aggr_state.group_states[*group_idx];
+            let mut state_accessor =
+                RowAccessor::new_from_layout(self.row_aggr_layout.clone());
+            state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice());
+            for idx in &group_state.indices {
+                for (accumulator, values_array, filter_array) in izip!(
+                    self.row_accumulators.iter_mut(),
+                    row_values.iter(),
+                    filter_bool_array.iter()
+                ) {
+                    if values_array.len() == 1 {
+                        let scalar_value =
+                            col_to_scalar(&values_array[0], filter_array, *idx as usize)?;
+                        accumulator.update_scalar(&scalar_value, &mut state_accessor)?;
+                    } else {
+                        let scalar_values = values_array
+                            .iter()
+                            .map(|array| {
+                                col_to_scalar(array, filter_array, *idx as usize)
+                            })
+                            .collect::<Result<Vec<_>>>()?;
+                        accumulator
+                            .update_scalar_values(&scalar_values, &mut state_accessor)?;
+                    }
+                }
+            }
+            // clear the group indices in this group
+            group_state.indices.clear();
+        }
+
+        Ok(())
+    }
+
     /// Perform group-by aggregation for the given [`RecordBatch`].
     ///
     /// If successful, this returns the additional number of bytes that were allocated during this process.
@@ -515,35 +564,50 @@ impl GroupedHashAggregateStream {
         for group_values in &group_by_values {
             let groups_with_rows =
                 self.update_group_state(group_values, &mut allocated)?;
-
-            // Collect all indices + offsets based on keys in this vec
-            let mut batch_indices: UInt32Builder = UInt32Builder::with_capacity(0);
-            let mut offsets = vec![0];
-            let mut offset_so_far = 0;
-            for &group_idx in groups_with_rows.iter() {
-                let indices = &self.aggr_state.group_states[group_idx].indices;
-                batch_indices.append_slice(indices);
-                offset_so_far += indices.len();
-                offsets.push(offset_so_far);
+            // Decide the accumulators update mode, use scalar value to update the accumulators when all of the conditions are meet:
+            // 1) The aggregation mode is Partial or Single
+            // 2) There is not normal aggregation expressions
+            // 3) The number of affected groups is high (entries in `aggr_state` have rows need to update). Usually the high cardinality case
+            if matches!(self.mode, AggregateMode::Partial | AggregateMode::Single)
+                && normal_aggr_input_values.is_empty()
+                && normal_filter_values.is_empty()
+                && groups_with_rows.len() >= batch.num_rows() / 10
+            {
+                self.update_accumulators_using_scalar(
+                    &groups_with_rows,
+                    &row_aggr_input_values,
+                    &row_filter_values,
+                )?;
+            } else {
+                // Collect all indices + offsets based on keys in this vec
+                let mut batch_indices: UInt32Builder = UInt32Builder::with_capacity(0);
+                let mut offsets = vec![0];
+                let mut offset_so_far = 0;
+                for &group_idx in groups_with_rows.iter() {
+                    let indices = &self.aggr_state.group_states[group_idx].indices;
+                    batch_indices.append_slice(indices);
+                    offset_so_far += indices.len();
+                    offsets.push(offset_so_far);
+                }
+                let batch_indices = batch_indices.finish();
+
+                let row_values = get_at_indices(&row_aggr_input_values, &batch_indices)?;
+                let normal_values =
+                    get_at_indices(&normal_aggr_input_values, &batch_indices)?;
+                let row_filter_values =
+                    get_optional_filters(&row_filter_values, &batch_indices);
+                let normal_filter_values =
+                    get_optional_filters(&normal_filter_values, &batch_indices);
+                self.update_accumulators_using_batch(
+                    &groups_with_rows,
+                    &offsets,
+                    &row_values,
+                    &normal_values,
+                    &row_filter_values,
+                    &normal_filter_values,
+                    &mut allocated,
+                )?;
             }
-            let batch_indices = batch_indices.finish();
-
-            let row_values = get_at_indices(&row_aggr_input_values, &batch_indices)?;
-            let normal_values =
-                get_at_indices(&normal_aggr_input_values, &batch_indices)?;
-            let row_filter_values =
-                get_optional_filters(&row_filter_values, &batch_indices);
-            let normal_filter_values =
-                get_optional_filters(&normal_filter_values, &batch_indices);
-            self.update_accumulators(
-                &groups_with_rows,
-                &offsets,
-                &row_values,
-                &normal_values,
-                &row_filter_values,
-                &normal_filter_values,
-                &mut allocated,
-            )?;
         }
         allocated += self
             .row_converter
@@ -791,3 +855,21 @@ fn slice_and_maybe_filter(
     };
     Ok(filtered_arrays)
 }
+
+/// This method is similar to Scalar::try_from_array except for the Null handling.
+/// This method returns [ScalarValue::Null] instead of [ScalarValue::Type(None)]
+fn col_to_scalar(
+    array: &ArrayRef,
+    filter: &Option<&BooleanArray>,
+    row_index: usize,
+) -> Result<ScalarValue> {
+    if array.is_null(row_index) {
+        return Ok(ScalarValue::Null);
+    }
+    if let Some(filter) = filter {
+        if !filter.value(row_index) {
+            return Ok(ScalarValue::Null);
+        }
+    }
+    ScalarValue::try_from_array(array, row_index)
+}
diff --git a/datafusion/core/tests/sql/aggregates.rs b/datafusion/core/tests/sql/aggregates.rs
index 10f838e249..e847ea0c0e 100644
--- a/datafusion/core/tests/sql/aggregates.rs
+++ b/datafusion/core/tests/sql/aggregates.rs
@@ -543,6 +543,57 @@ async fn count_multi_expr() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn count_multi_expr_group_by() -> Result<()> {
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("c1", DataType::Int32, true),
+        Field::new("c2", DataType::Int32, true),
+        Field::new("c3", DataType::Int32, true),
+    ]));
+
+    let data = RecordBatch::try_new(
+        schema.clone(),
+        vec![
+            Arc::new(Int32Array::from(vec![
+                Some(0),
+                None,
+                Some(1),
+                Some(2),
+                None,
+            ])),
+            Arc::new(Int32Array::from(vec![
+                Some(1),
+                Some(1),
+                Some(0),
+                None,
+                None,
+            ])),
+            Arc::new(Int32Array::from(vec![
+                Some(10),
+                Some(10),
+                Some(10),
+                Some(10),
+                Some(10),
+            ])),
+        ],
+    )?;
+
+    let ctx = SessionContext::new();
+    ctx.register_batch("test", data)?;
+    let sql = "SELECT c3, count(c1, c2) FROM test group by c3";
+    let actual = execute_to_batches(&ctx, sql).await;
+
+    let expected = vec![
+        "+----+------------------------+",
+        "| c3 | COUNT(test.c1,test.c2) |",
+        "+----+------------------------+",
+        "| 10 | 2                      |",
+        "+----+------------------------+",
+    ];
+    assert_batches_sorted_eq!(expected, &actual);
+    Ok(())
+}
+
 #[tokio::test]
 async fn simple_avg() -> Result<()> {
     let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs
index f898214b4b..2fe44602d8 100644
--- a/datafusion/physical-expr/src/aggregate/average.rs
+++ b/datafusion/physical-expr/src/aggregate/average.rs
@@ -299,8 +299,24 @@ impl RowAccumulator for AvgRowAccumulator {
             self.state_index() + 1,
             accessor,
             &sum::sum_batch(values, &self.sum_datatype)?,
-        )?;
-        Ok(())
+        )
+    }
+
+    fn update_scalar_values(
+        &mut self,
+        values: &[ScalarValue],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        let value = &values[0];
+        sum::update_avg_to_row(self.state_index(), accessor, value)
+    }
+
+    fn update_scalar(
+        &mut self,
+        value: &ScalarValue,
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        sum::update_avg_to_row(self.state_index(), accessor, value)
     }
 
     fn merge_batch(
@@ -315,8 +331,7 @@ impl RowAccumulator for AvgRowAccumulator {
 
         // sum
         let difference = sum::sum_batch(&states[1], &self.sum_datatype)?;
-        sum::add_to_row(self.state_index() + 1, accessor, &difference)?;
-        Ok(())
+        sum::add_to_row(self.state_index() + 1, accessor, &difference)
     }
 
     fn evaluate(&self, accessor: &RowAccessor) -> Result<ScalarValue> {
diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs
index c00520516a..15df28b4e3 100644
--- a/datafusion/physical-expr/src/aggregate/count.rs
+++ b/datafusion/physical-expr/src/aggregate/count.rs
@@ -242,6 +242,31 @@ impl RowAccumulator for CountRowAccumulator {
         Ok(())
     }
 
+    fn update_scalar_values(
+        &mut self,
+        values: &[ScalarValue],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        if !values.iter().any(|s| matches!(s, ScalarValue::Null)) {
+            accessor.add_u64(self.state_index, 1)
+        }
+        Ok(())
+    }
+
+    fn update_scalar(
+        &mut self,
+        value: &ScalarValue,
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        match value {
+            ScalarValue::Null => {
+                // do not update the accumulator
+            }
+            _ => accessor.add_u64(self.state_index, 1),
+        }
+        Ok(())
+    }
+
     fn merge_batch(
         &mut self,
         states: &[ArrayRef],
diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs b/datafusion/physical-expr/src/aggregate/min_max.rs
index e695ac400d..3a3d529839 100644
--- a/datafusion/physical-expr/src/aggregate/min_max.rs
+++ b/datafusion/physical-expr/src/aggregate/min_max.rs
@@ -565,6 +565,9 @@ macro_rules! min_max_v2 {
             ScalarValue::Decimal128(rhs, ..) => {
                 typed_min_max_v2!($INDEX, $ACC, rhs, i128, $OP)
             }
+            ScalarValue::Null => {
+                // do nothing
+            }
             e => {
                 return Err(DataFusionError::Internal(format!(
                     "MIN/MAX is not expected to receive scalars of incompatible types {:?}",
@@ -709,8 +712,24 @@ impl RowAccumulator for MaxRowAccumulator {
     ) -> Result<()> {
         let values = &values[0];
         let delta = &max_batch(values)?;
-        max_row(self.index, accessor, delta)?;
-        Ok(())
+        max_row(self.index, accessor, delta)
+    }
+
+    fn update_scalar_values(
+        &mut self,
+        values: &[ScalarValue],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        let value = &values[0];
+        max_row(self.index, accessor, value)
+    }
+
+    fn update_scalar(
+        &mut self,
+        value: &ScalarValue,
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        max_row(self.index, accessor, value)
     }
 
     fn merge_batch(
@@ -956,6 +975,23 @@ impl RowAccumulator for MinRowAccumulator {
         Ok(())
     }
 
+    fn update_scalar_values(
+        &mut self,
+        values: &[ScalarValue],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        let value = &values[0];
+        min_row(self.index, accessor, value)
+    }
+
+    fn update_scalar(
+        &mut self,
+        value: &ScalarValue,
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        min_row(self.index, accessor, value)
+    }
+
     fn merge_batch(
         &mut self,
         states: &[ArrayRef],
diff --git a/datafusion/physical-expr/src/aggregate/row_accumulator.rs b/datafusion/physical-expr/src/aggregate/row_accumulator.rs
index 00717a113f..19e847b3e7 100644
--- a/datafusion/physical-expr/src/aggregate/row_accumulator.rs
+++ b/datafusion/physical-expr/src/aggregate/row_accumulator.rs
@@ -51,6 +51,20 @@ pub trait RowAccumulator: Send + Sync + Debug {
         accessor: &mut RowAccessor,
     ) -> Result<()>;
 
+    /// updates the accumulator's state from a vector of Scalar value.
+    fn update_scalar_values(
+        &mut self,
+        values: &[ScalarValue],
+        accessor: &mut RowAccessor,
+    ) -> Result<()>;
+
+    /// updates the accumulator's state from a Scalar value.
+    fn update_scalar(
+        &mut self,
+        value: &ScalarValue,
+        accessor: &mut RowAccessor,
+    ) -> Result<()>;
+
     /// updates the accumulator's state from a vector of states.
     fn merge_batch(
         &mut self,
diff --git a/datafusion/physical-expr/src/aggregate/sum.rs b/datafusion/physical-expr/src/aggregate/sum.rs
index abf67933eb..e08726e465 100644
--- a/datafusion/physical-expr/src/aggregate/sum.rs
+++ b/datafusion/physical-expr/src/aggregate/sum.rs
@@ -240,12 +240,26 @@ macro_rules! sum_row {
     }};
 }
 
+macro_rules! avg_row {
+    ($INDEX:ident, $ACC:ident, $DELTA:expr, $TYPE:ident) => {{
+        paste::item! {
+            if let Some(v) = $DELTA {
+                $ACC.add_u64($INDEX, 1);
+                $ACC.[<add_ $TYPE>]($INDEX + 1, *v)
+            }
+        }
+    }};
+}
+
 pub(crate) fn add_to_row(
     index: usize,
     accessor: &mut RowAccessor,
     s: &ScalarValue,
 ) -> Result<()> {
     match s {
+        ScalarValue::Null => {
+            // do nothing
+        }
         ScalarValue::Float64(rhs) => {
             sum_row!(index, accessor, rhs, f64)
         }
@@ -270,6 +284,39 @@ pub(crate) fn add_to_row(
     Ok(())
 }
 
+pub(crate) fn update_avg_to_row(
+    index: usize,
+    accessor: &mut RowAccessor,
+    s: &ScalarValue,
+) -> Result<()> {
+    match s {
+        ScalarValue::Null => {
+            // do nothing
+        }
+        ScalarValue::Float64(rhs) => {
+            avg_row!(index, accessor, rhs, f64)
+        }
+        ScalarValue::Float32(rhs) => {
+            avg_row!(index, accessor, rhs, f32)
+        }
+        ScalarValue::UInt64(rhs) => {
+            avg_row!(index, accessor, rhs, u64)
+        }
+        ScalarValue::Int64(rhs) => {
+            avg_row!(index, accessor, rhs, i64)
+        }
+        ScalarValue::Decimal128(rhs, _, _) => {
+            avg_row!(index, accessor, rhs, i128)
+        }
+        _ => {
+            let msg =
+                format!("Row avg updater is not expected to receive a scalar {s:?}");
+            return Err(DataFusionError::Internal(msg));
+        }
+    }
+    Ok(())
+}
+
 impl Accumulator for SumAccumulator {
     fn state(&self) -> Result<Vec<ScalarValue>> {
         Ok(vec![self.sum.clone(), ScalarValue::from(self.count)])
@@ -331,8 +378,24 @@ impl RowAccumulator for SumRowAccumulator {
     ) -> Result<()> {
         let values = &values[0];
         let delta = sum_batch(values, &self.datatype)?;
-        add_to_row(self.index, accessor, &delta)?;
-        Ok(())
+        add_to_row(self.index, accessor, &delta)
+    }
+
+    fn update_scalar_values(
+        &mut self,
+        values: &[ScalarValue],
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        let value = &values[0];
+        add_to_row(self.index, accessor, value)
+    }
+
+    fn update_scalar(
+        &mut self,
+        value: &ScalarValue,
+        accessor: &mut RowAccessor,
+    ) -> Result<()> {
+        add_to_row(self.index, accessor, value)
     }
 
     fn merge_batch(
diff --git a/datafusion/row/src/accessor.rs b/datafusion/row/src/accessor.rs
index bba44f0e56..14a7ca264c 100644
--- a/datafusion/row/src/accessor.rs
+++ b/datafusion/row/src/accessor.rs
@@ -71,6 +71,7 @@ macro_rules! fn_add_idx {
     ($NATIVE: ident) => {
         paste::item! {
             /// add field at `idx` with `value`
+            #[inline(always)]
             pub fn [<add_ $NATIVE>](&mut self, idx: usize, value: $NATIVE) {
                 if self.is_valid_at(idx) {
                     self.[<set_ $NATIVE>](idx, value + self.[<get_ $NATIVE>](idx));
@@ -87,6 +88,7 @@ macro_rules! fn_max_min_idx {
     ($NATIVE: ident, $OP: ident) => {
         paste::item! {
             /// check max then update
+            #[inline(always)]
             pub fn [<$OP _ $NATIVE>](&mut self, idx: usize, value: $NATIVE) {
                 if self.is_valid_at(idx) {
                     let v = value.$OP(self.[<get_ $NATIVE>](idx));
@@ -103,6 +105,7 @@ macro_rules! fn_max_min_idx {
 macro_rules! fn_get_idx_scalar {
     ($NATIVE: ident, $SCALAR:ident) => {
         paste::item! {
+            #[inline(always)]
             pub fn [<get_ $NATIVE _scalar>](&self, idx: usize) -> ScalarValue {
                 if self.is_valid_at(idx) {
                     ScalarValue::$SCALAR(Some(self.[<get_ $NATIVE>](idx)))