You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by nj...@apache.org on 2023/04/20 01:41:58 UTC
[arrow-datafusion] branch main updated: Row accumulator support update Scalar values (#6003)
This is an automated email from the ASF dual-hosted git repository.
nju_yaho pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 10b0eff7df Row accumulator support update Scalar values (#6003)
10b0eff7df is described below
commit 10b0eff7dfbe195a724285bc166b20240e8ebccb
Author: mingmwang <mi...@gmail.com>
AuthorDate: Thu Apr 20 09:41:51 2023 +0800
Row accumulator support update Scalar values (#6003)
* support update RowAccumulators using Scalar values
* fix group by count multi exprs
* refine hot path, avoid Vec creation
* fix UT
* resolve review comments
* remove redundant null check
---
datafusion/common/src/scalar.rs | 2 +-
.../core/src/physical_plan/aggregates/row_hash.rs | 146 ++++++++++++++++-----
datafusion/core/tests/sql/aggregates.rs | 51 +++++++
datafusion/physical-expr/src/aggregate/average.rs | 23 +++-
datafusion/physical-expr/src/aggregate/count.rs | 25 ++++
datafusion/physical-expr/src/aggregate/min_max.rs | 40 +++++-
.../physical-expr/src/aggregate/row_accumulator.rs | 14 ++
datafusion/physical-expr/src/aggregate/sum.rs | 67 +++++++++-
datafusion/row/src/accessor.rs | 3 +
9 files changed, 330 insertions(+), 41 deletions(-)
diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index c22616883c..f313e662da 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -1443,7 +1443,7 @@ impl std::hash::Hash for ScalarValue {
/// return a reference to the values array and the index into it for a
/// dictionary array
#[inline]
-fn get_dict_value<K: ArrowDictionaryKeyType>(
+pub fn get_dict_value<K: ArrowDictionaryKeyType>(
array: &dyn Array,
index: usize,
) -> (&ArrayRef, Option<usize>) {
diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
index d9e42e478d..bf1846ae98 100644
--- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs
+++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
@@ -31,6 +31,7 @@ use futures::stream::{Stream, StreamExt};
use crate::execution::context::TaskContext;
use crate::execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
+use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
use crate::physical_plan::aggregates::{
evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AccumulatorItem,
AggregateMode, PhysicalGroupBy, RowAccumulatorItem,
@@ -38,9 +39,7 @@ use crate::physical_plan::aggregates::{
use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
-
-use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
-use arrow::array::{new_null_array, Array, ArrayRef, PrimitiveArray, UInt32Builder};
+use arrow::array::*;
use arrow::compute::{cast, filter};
use arrow::datatypes::{DataType, Schema, UInt32Type};
use arrow::{compute, datatypes::SchemaRef, record_batch::RecordBatch};
@@ -53,6 +52,7 @@ use datafusion_row::layout::RowLayout;
use datafusion_row::reader::{read_row, RowReader};
use datafusion_row::MutableRecordBatch;
use hashbrown::raw::RawTable;
+use itertools::izip;
/// Grouping aggregate with row-format aggregation states inside.
///
@@ -409,7 +409,7 @@ impl GroupedHashAggregateStream {
// Update the accumulator results, according to row_aggr_state.
#[allow(clippy::too_many_arguments)]
- fn update_accumulators(
+ fn update_accumulators_using_batch(
&mut self,
groups_with_rows: &[usize],
offsets: &[usize],
@@ -490,6 +490,55 @@ impl GroupedHashAggregateStream {
Ok(())
}
+ // Update the accumulator results, according to row_aggr_state.
+ fn update_accumulators_using_scalar(
+ &mut self,
+ groups_with_rows: &[usize],
+ row_values: &[Vec<ArrayRef>],
+ row_filter_values: &[Option<ArrayRef>],
+ ) -> Result<()> {
+ let filter_bool_array = row_filter_values
+ .iter()
+ .map(|filter_opt| match filter_opt {
+ Some(f) => Ok(Some(as_boolean_array(f)?)),
+ None => Ok(None),
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ for group_idx in groups_with_rows {
+ let group_state = &mut self.aggr_state.group_states[*group_idx];
+ let mut state_accessor =
+ RowAccessor::new_from_layout(self.row_aggr_layout.clone());
+ state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice());
+ for idx in &group_state.indices {
+ for (accumulator, values_array, filter_array) in izip!(
+ self.row_accumulators.iter_mut(),
+ row_values.iter(),
+ filter_bool_array.iter()
+ ) {
+ if values_array.len() == 1 {
+ let scalar_value =
+ col_to_scalar(&values_array[0], filter_array, *idx as usize)?;
+ accumulator.update_scalar(&scalar_value, &mut state_accessor)?;
+ } else {
+ let scalar_values = values_array
+ .iter()
+ .map(|array| {
+ col_to_scalar(array, filter_array, *idx as usize)
+ })
+ .collect::<Result<Vec<_>>>()?;
+ accumulator
+ .update_scalar_values(&scalar_values, &mut state_accessor)?;
+ }
+ }
+ }
+ // clear the group indices in this group
+ group_state.indices.clear();
+ }
+
+ Ok(())
+ }
+
/// Perform group-by aggregation for the given [`RecordBatch`].
///
/// If successful, this returns the additional number of bytes that were allocated during this process.
@@ -515,35 +564,50 @@ impl GroupedHashAggregateStream {
for group_values in &group_by_values {
let groups_with_rows =
self.update_group_state(group_values, &mut allocated)?;
-
- // Collect all indices + offsets based on keys in this vec
- let mut batch_indices: UInt32Builder = UInt32Builder::with_capacity(0);
- let mut offsets = vec![0];
- let mut offset_so_far = 0;
- for &group_idx in groups_with_rows.iter() {
- let indices = &self.aggr_state.group_states[group_idx].indices;
- batch_indices.append_slice(indices);
- offset_so_far += indices.len();
- offsets.push(offset_so_far);
+ // Decide the accumulators update mode, use scalar value to update the accumulators when all of the conditions are meet:
+ // 1) The aggregation mode is Partial or Single
+ // 2) There is not normal aggregation expressions
+ // 3) The number of affected groups is high (entries in `aggr_state` have rows need to update). Usually the high cardinality case
+ if matches!(self.mode, AggregateMode::Partial | AggregateMode::Single)
+ && normal_aggr_input_values.is_empty()
+ && normal_filter_values.is_empty()
+ && groups_with_rows.len() >= batch.num_rows() / 10
+ {
+ self.update_accumulators_using_scalar(
+ &groups_with_rows,
+ &row_aggr_input_values,
+ &row_filter_values,
+ )?;
+ } else {
+ // Collect all indices + offsets based on keys in this vec
+ let mut batch_indices: UInt32Builder = UInt32Builder::with_capacity(0);
+ let mut offsets = vec![0];
+ let mut offset_so_far = 0;
+ for &group_idx in groups_with_rows.iter() {
+ let indices = &self.aggr_state.group_states[group_idx].indices;
+ batch_indices.append_slice(indices);
+ offset_so_far += indices.len();
+ offsets.push(offset_so_far);
+ }
+ let batch_indices = batch_indices.finish();
+
+ let row_values = get_at_indices(&row_aggr_input_values, &batch_indices)?;
+ let normal_values =
+ get_at_indices(&normal_aggr_input_values, &batch_indices)?;
+ let row_filter_values =
+ get_optional_filters(&row_filter_values, &batch_indices);
+ let normal_filter_values =
+ get_optional_filters(&normal_filter_values, &batch_indices);
+ self.update_accumulators_using_batch(
+ &groups_with_rows,
+ &offsets,
+ &row_values,
+ &normal_values,
+ &row_filter_values,
+ &normal_filter_values,
+ &mut allocated,
+ )?;
}
- let batch_indices = batch_indices.finish();
-
- let row_values = get_at_indices(&row_aggr_input_values, &batch_indices)?;
- let normal_values =
- get_at_indices(&normal_aggr_input_values, &batch_indices)?;
- let row_filter_values =
- get_optional_filters(&row_filter_values, &batch_indices);
- let normal_filter_values =
- get_optional_filters(&normal_filter_values, &batch_indices);
- self.update_accumulators(
- &groups_with_rows,
- &offsets,
- &row_values,
- &normal_values,
- &row_filter_values,
- &normal_filter_values,
- &mut allocated,
- )?;
}
allocated += self
.row_converter
@@ -791,3 +855,21 @@ fn slice_and_maybe_filter(
};
Ok(filtered_arrays)
}
+
+/// This method is similar to Scalar::try_from_array except for the Null handling.
+/// This method returns [ScalarValue::Null] instead of [ScalarValue::Type(None)]
+fn col_to_scalar(
+ array: &ArrayRef,
+ filter: &Option<&BooleanArray>,
+ row_index: usize,
+) -> Result<ScalarValue> {
+ if array.is_null(row_index) {
+ return Ok(ScalarValue::Null);
+ }
+ if let Some(filter) = filter {
+ if !filter.value(row_index) {
+ return Ok(ScalarValue::Null);
+ }
+ }
+ ScalarValue::try_from_array(array, row_index)
+}
diff --git a/datafusion/core/tests/sql/aggregates.rs b/datafusion/core/tests/sql/aggregates.rs
index 10f838e249..e847ea0c0e 100644
--- a/datafusion/core/tests/sql/aggregates.rs
+++ b/datafusion/core/tests/sql/aggregates.rs
@@ -543,6 +543,57 @@ async fn count_multi_expr() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn count_multi_expr_group_by() -> Result<()> {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::Int32, true),
+ Field::new("c2", DataType::Int32, true),
+ Field::new("c3", DataType::Int32, true),
+ ]));
+
+ let data = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(Int32Array::from(vec![
+ Some(0),
+ None,
+ Some(1),
+ Some(2),
+ None,
+ ])),
+ Arc::new(Int32Array::from(vec![
+ Some(1),
+ Some(1),
+ Some(0),
+ None,
+ None,
+ ])),
+ Arc::new(Int32Array::from(vec![
+ Some(10),
+ Some(10),
+ Some(10),
+ Some(10),
+ Some(10),
+ ])),
+ ],
+ )?;
+
+ let ctx = SessionContext::new();
+ ctx.register_batch("test", data)?;
+ let sql = "SELECT c3, count(c1, c2) FROM test group by c3";
+ let actual = execute_to_batches(&ctx, sql).await;
+
+ let expected = vec![
+ "+----+------------------------+",
+ "| c3 | COUNT(test.c1,test.c2) |",
+ "+----+------------------------+",
+ "| 10 | 2 |",
+ "+----+------------------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &actual);
+ Ok(())
+}
+
#[tokio::test]
async fn simple_avg() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs
index f898214b4b..2fe44602d8 100644
--- a/datafusion/physical-expr/src/aggregate/average.rs
+++ b/datafusion/physical-expr/src/aggregate/average.rs
@@ -299,8 +299,24 @@ impl RowAccumulator for AvgRowAccumulator {
self.state_index() + 1,
accessor,
&sum::sum_batch(values, &self.sum_datatype)?,
- )?;
- Ok(())
+ )
+ }
+
+ fn update_scalar_values(
+ &mut self,
+ values: &[ScalarValue],
+ accessor: &mut RowAccessor,
+ ) -> Result<()> {
+ let value = &values[0];
+ sum::update_avg_to_row(self.state_index(), accessor, value)
+ }
+
+ fn update_scalar(
+ &mut self,
+ value: &ScalarValue,
+ accessor: &mut RowAccessor,
+ ) -> Result<()> {
+ sum::update_avg_to_row(self.state_index(), accessor, value)
}
fn merge_batch(
@@ -315,8 +331,7 @@ impl RowAccumulator for AvgRowAccumulator {
// sum
let difference = sum::sum_batch(&states[1], &self.sum_datatype)?;
- sum::add_to_row(self.state_index() + 1, accessor, &difference)?;
- Ok(())
+ sum::add_to_row(self.state_index() + 1, accessor, &difference)
}
fn evaluate(&self, accessor: &RowAccessor) -> Result<ScalarValue> {
diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs
index c00520516a..15df28b4e3 100644
--- a/datafusion/physical-expr/src/aggregate/count.rs
+++ b/datafusion/physical-expr/src/aggregate/count.rs
@@ -242,6 +242,31 @@ impl RowAccumulator for CountRowAccumulator {
Ok(())
}
+ fn update_scalar_values(
+ &mut self,
+ values: &[ScalarValue],
+ accessor: &mut RowAccessor,
+ ) -> Result<()> {
+ if !values.iter().any(|s| matches!(s, ScalarValue::Null)) {
+ accessor.add_u64(self.state_index, 1)
+ }
+ Ok(())
+ }
+
+ fn update_scalar(
+ &mut self,
+ value: &ScalarValue,
+ accessor: &mut RowAccessor,
+ ) -> Result<()> {
+ match value {
+ ScalarValue::Null => {
+ // do not update the accumulator
+ }
+ _ => accessor.add_u64(self.state_index, 1),
+ }
+ Ok(())
+ }
+
fn merge_batch(
&mut self,
states: &[ArrayRef],
diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs b/datafusion/physical-expr/src/aggregate/min_max.rs
index e695ac400d..3a3d529839 100644
--- a/datafusion/physical-expr/src/aggregate/min_max.rs
+++ b/datafusion/physical-expr/src/aggregate/min_max.rs
@@ -565,6 +565,9 @@ macro_rules! min_max_v2 {
ScalarValue::Decimal128(rhs, ..) => {
typed_min_max_v2!($INDEX, $ACC, rhs, i128, $OP)
}
+ ScalarValue::Null => {
+ // do nothing
+ }
e => {
return Err(DataFusionError::Internal(format!(
"MIN/MAX is not expected to receive scalars of incompatible types {:?}",
@@ -709,8 +712,24 @@ impl RowAccumulator for MaxRowAccumulator {
) -> Result<()> {
let values = &values[0];
let delta = &max_batch(values)?;
- max_row(self.index, accessor, delta)?;
- Ok(())
+ max_row(self.index, accessor, delta)
+ }
+
+ fn update_scalar_values(
+ &mut self,
+ values: &[ScalarValue],
+ accessor: &mut RowAccessor,
+ ) -> Result<()> {
+ let value = &values[0];
+ max_row(self.index, accessor, value)
+ }
+
+ fn update_scalar(
+ &mut self,
+ value: &ScalarValue,
+ accessor: &mut RowAccessor,
+ ) -> Result<()> {
+ max_row(self.index, accessor, value)
}
fn merge_batch(
@@ -956,6 +975,23 @@ impl RowAccumulator for MinRowAccumulator {
Ok(())
}
+ fn update_scalar_values(
+ &mut self,
+ values: &[ScalarValue],
+ accessor: &mut RowAccessor,
+ ) -> Result<()> {
+ let value = &values[0];
+ min_row(self.index, accessor, value)
+ }
+
+ fn update_scalar(
+ &mut self,
+ value: &ScalarValue,
+ accessor: &mut RowAccessor,
+ ) -> Result<()> {
+ min_row(self.index, accessor, value)
+ }
+
fn merge_batch(
&mut self,
states: &[ArrayRef],
diff --git a/datafusion/physical-expr/src/aggregate/row_accumulator.rs b/datafusion/physical-expr/src/aggregate/row_accumulator.rs
index 00717a113f..19e847b3e7 100644
--- a/datafusion/physical-expr/src/aggregate/row_accumulator.rs
+++ b/datafusion/physical-expr/src/aggregate/row_accumulator.rs
@@ -51,6 +51,20 @@ pub trait RowAccumulator: Send + Sync + Debug {
accessor: &mut RowAccessor,
) -> Result<()>;
+ /// updates the accumulator's state from a vector of Scalar value.
+ fn update_scalar_values(
+ &mut self,
+ values: &[ScalarValue],
+ accessor: &mut RowAccessor,
+ ) -> Result<()>;
+
+ /// updates the accumulator's state from a Scalar value.
+ fn update_scalar(
+ &mut self,
+ value: &ScalarValue,
+ accessor: &mut RowAccessor,
+ ) -> Result<()>;
+
/// updates the accumulator's state from a vector of states.
fn merge_batch(
&mut self,
diff --git a/datafusion/physical-expr/src/aggregate/sum.rs b/datafusion/physical-expr/src/aggregate/sum.rs
index abf67933eb..e08726e465 100644
--- a/datafusion/physical-expr/src/aggregate/sum.rs
+++ b/datafusion/physical-expr/src/aggregate/sum.rs
@@ -240,12 +240,26 @@ macro_rules! sum_row {
}};
}
+macro_rules! avg_row {
+ ($INDEX:ident, $ACC:ident, $DELTA:expr, $TYPE:ident) => {{
+ paste::item! {
+ if let Some(v) = $DELTA {
+ $ACC.add_u64($INDEX, 1);
+ $ACC.[<add_ $TYPE>]($INDEX + 1, *v)
+ }
+ }
+ }};
+}
+
pub(crate) fn add_to_row(
index: usize,
accessor: &mut RowAccessor,
s: &ScalarValue,
) -> Result<()> {
match s {
+ ScalarValue::Null => {
+ // do nothing
+ }
ScalarValue::Float64(rhs) => {
sum_row!(index, accessor, rhs, f64)
}
@@ -270,6 +284,39 @@ pub(crate) fn add_to_row(
Ok(())
}
+pub(crate) fn update_avg_to_row(
+ index: usize,
+ accessor: &mut RowAccessor,
+ s: &ScalarValue,
+) -> Result<()> {
+ match s {
+ ScalarValue::Null => {
+ // do nothing
+ }
+ ScalarValue::Float64(rhs) => {
+ avg_row!(index, accessor, rhs, f64)
+ }
+ ScalarValue::Float32(rhs) => {
+ avg_row!(index, accessor, rhs, f32)
+ }
+ ScalarValue::UInt64(rhs) => {
+ avg_row!(index, accessor, rhs, u64)
+ }
+ ScalarValue::Int64(rhs) => {
+ avg_row!(index, accessor, rhs, i64)
+ }
+ ScalarValue::Decimal128(rhs, _, _) => {
+ avg_row!(index, accessor, rhs, i128)
+ }
+ _ => {
+ let msg =
+ format!("Row avg updater is not expected to receive a scalar {s:?}");
+ return Err(DataFusionError::Internal(msg));
+ }
+ }
+ Ok(())
+}
+
impl Accumulator for SumAccumulator {
fn state(&self) -> Result<Vec<ScalarValue>> {
Ok(vec![self.sum.clone(), ScalarValue::from(self.count)])
@@ -331,8 +378,24 @@ impl RowAccumulator for SumRowAccumulator {
) -> Result<()> {
let values = &values[0];
let delta = sum_batch(values, &self.datatype)?;
- add_to_row(self.index, accessor, &delta)?;
- Ok(())
+ add_to_row(self.index, accessor, &delta)
+ }
+
+ fn update_scalar_values(
+ &mut self,
+ values: &[ScalarValue],
+ accessor: &mut RowAccessor,
+ ) -> Result<()> {
+ let value = &values[0];
+ add_to_row(self.index, accessor, value)
+ }
+
+ fn update_scalar(
+ &mut self,
+ value: &ScalarValue,
+ accessor: &mut RowAccessor,
+ ) -> Result<()> {
+ add_to_row(self.index, accessor, value)
}
fn merge_batch(
diff --git a/datafusion/row/src/accessor.rs b/datafusion/row/src/accessor.rs
index bba44f0e56..14a7ca264c 100644
--- a/datafusion/row/src/accessor.rs
+++ b/datafusion/row/src/accessor.rs
@@ -71,6 +71,7 @@ macro_rules! fn_add_idx {
($NATIVE: ident) => {
paste::item! {
/// add field at `idx` with `value`
+ #[inline(always)]
pub fn [<add_ $NATIVE>](&mut self, idx: usize, value: $NATIVE) {
if self.is_valid_at(idx) {
self.[<set_ $NATIVE>](idx, value + self.[<get_ $NATIVE>](idx));
@@ -87,6 +88,7 @@ macro_rules! fn_max_min_idx {
($NATIVE: ident, $OP: ident) => {
paste::item! {
/// check max then update
+ #[inline(always)]
pub fn [<$OP _ $NATIVE>](&mut self, idx: usize, value: $NATIVE) {
if self.is_valid_at(idx) {
let v = value.$OP(self.[<get_ $NATIVE>](idx));
@@ -103,6 +105,7 @@ macro_rules! fn_max_min_idx {
macro_rules! fn_get_idx_scalar {
($NATIVE: ident, $SCALAR:ident) => {
paste::item! {
+ #[inline(always)]
pub fn [<get_ $NATIVE _scalar>](&self, idx: usize) -> ScalarValue {
if self.is_valid_at(idx) {
ScalarValue::$SCALAR(Some(self.[<get_ $NATIVE>](idx)))