You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/07/19 10:50:34 UTC
[arrow-datafusion] branch main updated: Consolidate `BoundedAggregateStream` (#6932)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 1810a15384 Consolidate `BoundedAggregateStream` (#6932)
1810a15384 is described below
commit 1810a15384791b00c0f796fbdfc8d0d5ddb5fdae
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Wed Jul 19 06:50:29 2023 -0400
Consolidate `BoundedAggregateStream` (#6932)
* Consolidate `BoundedAggregateStream`
* Clarify end of input
* Apply suggestions from code review
Co-authored-by: Mustafa Akur <10...@users.noreply.github.com>
* Update diagram for partial sort
* assert input is not done
* Apply suggestions from code review
Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
* clarify text
* Add more comments about delta memory accounting
---------
Co-authored-by: Mustafa Akur <10...@users.noreply.github.com>
Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
---
.../aggregates/bounded_aggregate_stream.rs | 1080 --------------------
.../core/src/physical_plan/aggregates/mod.rs | 19 +-
.../src/physical_plan/aggregates/order/full.rs | 163 +++
.../core/src/physical_plan/aggregates/order/mod.rs | 139 +++
.../src/physical_plan/aggregates/order/partial.rs | 267 +++++
.../core/src/physical_plan/aggregates/row_hash.rs | 230 +++--
.../core/src/physical_plan/aggregates/utils.rs | 150 ---
datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs | 5 +-
datafusion/execution/src/memory_pool/mod.rs | 47 +-
datafusion/physical-expr/src/aggregate/average.rs | 19 +-
datafusion/physical-expr/src/aggregate/count.rs | 9 +-
.../src/aggregate/groups_accumulator/accumulate.rs | 37 +-
.../src/aggregate/groups_accumulator/adapter.rs | 112 +-
.../src/aggregate/groups_accumulator/bool_op.rs | 25 +-
.../src/aggregate/groups_accumulator/mod.rs | 62 +-
.../src/aggregate/groups_accumulator/prim_op.rs | 12 +-
datafusion/physical-expr/src/lib.rs | 5 +-
17 files changed, 983 insertions(+), 1398 deletions(-)
diff --git a/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs b/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
deleted file mode 100644
index 5982701e21..0000000000
--- a/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
+++ /dev/null
@@ -1,1080 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! This file implements streaming aggregation on ordered GROUP BY expressions.
-//! Generated output will itself have an ordering and the executor can run with
-//! bounded memory, ensuring composability in streaming cases.
-
-use std::cmp::min;
-use std::ops::Range;
-use std::sync::Arc;
-use std::task::{Context, Poll};
-use std::vec;
-
-use ahash::RandomState;
-use futures::ready;
-use futures::stream::{Stream, StreamExt};
-use hashbrown::raw::RawTable;
-use itertools::izip;
-
-use crate::physical_plan::aggregates::{
- evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AggregateMode,
- AggregationOrdering, GroupByOrderMode, PhysicalGroupBy, RowAccumulatorItem,
-};
-use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
-use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
-use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
-use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
-use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
-use datafusion_execution::TaskContext;
-
-use crate::physical_plan::aggregates::utils::{
- aggr_state_schema, col_to_scalar, get_at_indices, get_optional_filters,
- read_as_batch, slice_and_maybe_filter, ExecutionState, GroupState,
-};
-use arrow::array::{new_null_array, ArrayRef, UInt32Builder};
-use arrow::compute::{cast, SortColumn};
-use arrow::datatypes::DataType;
-use arrow::row::{OwnedRow, RowConverter, SortField};
-use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
-use datafusion_common::cast::as_boolean_array;
-use datafusion_common::utils::{evaluate_partition_ranges, get_row_at_idx};
-use datafusion_common::{Result, ScalarValue};
-use datafusion_expr::Accumulator;
-use datafusion_physical_expr::hash_utils::create_hashes;
-use datafusion_row::accessor::RowAccessor;
-use datafusion_row::layout::RowLayout;
-
-use super::AggregateExec;
-
-/// Grouping aggregate with row-format aggregation states inside.
-///
-/// For each aggregation entry, we use:
-/// - [Arrow-row] represents grouping keys for fast hash computation and comparison directly on raw bytes.
-/// - [WordAligned] row to store aggregation state, designed to be CPU-friendly when updates over every field are often.
-///
-/// The architecture is the following:
-///
-/// 1. For each input RecordBatch, update aggregation states corresponding to all appeared grouping keys.
-/// 2. At the end of the aggregation (e.g. end of batches in a partition), the accumulator converts its state to a RecordBatch of a single row
-/// 3. The RecordBatches of all accumulators are merged (`concatenate` in `rust/arrow`) together to a single RecordBatch.
-/// 4. The state's RecordBatch is `merge`d to a new state
-/// 5. The state is mapped to the final value
-///
-/// [Arrow-row]: OwnedRow
-/// [WordAligned]: datafusion_row::layout
-pub(crate) struct BoundedAggregateStream {
- schema: SchemaRef,
- input: SendableRecordBatchStream,
- mode: AggregateMode,
-
- normal_aggr_expr: Vec<Arc<dyn AggregateExpr>>,
- /// Aggregate expressions not supporting row accumulation
- normal_aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
- /// Filter expression for each normal aggregate expression
- normal_filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
-
- /// Aggregate expressions supporting row accumulation
- row_aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
- /// Filter expression for each row aggregate expression
- row_filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
- row_accumulators: Vec<RowAccumulatorItem>,
- row_converter: RowConverter,
- row_aggr_schema: SchemaRef,
- row_aggr_layout: Arc<RowLayout>,
-
- group_by: PhysicalGroupBy,
-
- aggr_state: AggregationState,
- exec_state: ExecutionState,
- baseline_metrics: BaselineMetrics,
- random_state: RandomState,
- /// size to be used for resulting RecordBatches
- batch_size: usize,
- /// threshold for using `ScalarValue`s to update
- /// accumulators during high-cardinality aggregations for each input batch.
- scalar_update_factor: usize,
- /// if the result is chunked into batches,
- /// last offset is preserved for continuation.
- row_group_skip_position: usize,
- /// keeps range for each accumulator in the field
- /// first element in the array corresponds to normal accumulators
- /// second element in the array corresponds to row accumulators
- indices: [Vec<Range<usize>>; 2],
- /// Information on how the input of this group is ordered
- aggregation_ordering: AggregationOrdering,
- /// Has this stream finished producing output
- is_end: bool,
-}
-
-impl BoundedAggregateStream {
- /// Create a new BoundedAggregateStream
- pub fn new(
- agg: &AggregateExec,
- context: Arc<TaskContext>,
- partition: usize,
- aggregation_ordering: AggregationOrdering, // Stores algorithm mode and output ordering
- ) -> Result<Self> {
- let agg_schema = Arc::clone(&agg.schema);
- let agg_group_by = agg.group_by.clone();
- let agg_filter_expr = agg.filter_expr.clone();
-
- let batch_size = context.session_config().batch_size();
- let scalar_update_factor = context.session_config().agg_scalar_update_factor();
- let input = agg.input.execute(partition, Arc::clone(&context))?;
- let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition);
-
- let timer = baseline_metrics.elapsed_compute().timer();
-
- let mut start_idx = agg_group_by.expr.len();
- let mut row_aggr_expr = vec![];
- let mut row_agg_indices = vec![];
- let mut row_aggregate_expressions = vec![];
- let mut row_filter_expressions = vec![];
- let mut normal_aggr_expr = vec![];
- let mut normal_agg_indices = vec![];
- let mut normal_aggregate_expressions = vec![];
- let mut normal_filter_expressions = vec![];
- // The expressions to evaluate the batch, one vec of expressions per aggregation.
- // Assuming create_schema() always puts group columns in front of aggregation columns, we set
- // col_idx_base to the group expression count.
- let all_aggregate_expressions =
- aggregates::aggregate_expressions(&agg.aggr_expr, &agg.mode, start_idx)?;
- let filter_expressions = match agg.mode {
- AggregateMode::Partial
- | AggregateMode::Single
- | AggregateMode::SinglePartitioned => agg_filter_expr,
- AggregateMode::Final | AggregateMode::FinalPartitioned => {
- vec![None; agg.aggr_expr.len()]
- }
- };
- for ((expr, others), filter) in agg
- .aggr_expr
- .iter()
- .zip(all_aggregate_expressions.into_iter())
- .zip(filter_expressions.into_iter())
- {
- let n_fields = match agg.mode {
- // In partial aggregation, we keep additional fields in order to successfully
- // merge aggregation results downstream.
- AggregateMode::Partial => expr.state_fields()?.len(),
- _ => 1,
- };
- // Stores range of each expression:
- let aggr_range = Range {
- start: start_idx,
- end: start_idx + n_fields,
- };
- if expr.row_accumulator_supported() {
- row_aggregate_expressions.push(others);
- row_filter_expressions.push(filter.clone());
- row_agg_indices.push(aggr_range);
- row_aggr_expr.push(expr.clone());
- } else {
- normal_aggregate_expressions.push(others);
- normal_filter_expressions.push(filter.clone());
- normal_agg_indices.push(aggr_range);
- normal_aggr_expr.push(expr.clone());
- }
- start_idx += n_fields;
- }
-
- let row_accumulators = aggregates::create_row_accumulators(&row_aggr_expr)?;
-
- let row_aggr_schema = aggr_state_schema(&row_aggr_expr);
-
- let group_schema = group_schema(&agg_schema, agg_group_by.expr.len());
- let row_converter = RowConverter::new(
- group_schema
- .fields()
- .iter()
- .map(|f| SortField::new(f.data_type().clone()))
- .collect(),
- )?;
-
- let row_aggr_layout = Arc::new(RowLayout::new(&row_aggr_schema));
-
- let name = format!("BoundedAggregateStream[{partition}]");
- let aggr_state = AggregationState {
- reservation: MemoryConsumer::new(name).register(context.memory_pool()),
- map: RawTable::with_capacity(0),
- ordered_group_states: Vec::with_capacity(0),
- };
-
- timer.done();
-
- let exec_state = ExecutionState::ReadingInput;
-
- Ok(BoundedAggregateStream {
- schema: agg_schema,
- input,
- mode: agg.mode,
- normal_aggr_expr,
- normal_aggregate_expressions,
- normal_filter_expressions,
- row_aggregate_expressions,
- row_filter_expressions,
- row_accumulators,
- row_converter,
- row_aggr_schema,
- row_aggr_layout,
- group_by: agg_group_by,
- aggr_state,
- exec_state,
- baseline_metrics,
- random_state: Default::default(),
- batch_size,
- scalar_update_factor,
- row_group_skip_position: 0,
- indices: [normal_agg_indices, row_agg_indices],
- is_end: false,
- aggregation_ordering,
- })
- }
-}
-
-impl Stream for BoundedAggregateStream {
- type Item = Result<RecordBatch>;
-
- fn poll_next(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
- let elapsed_compute = self.baseline_metrics.elapsed_compute().clone();
-
- loop {
- match self.exec_state {
- ExecutionState::ReadingInput => {
- match ready!(self.input.poll_next_unpin(cx)) {
- // new batch to aggregate
- Some(Ok(batch)) => {
- let timer = elapsed_compute.timer();
- let result = self.group_aggregate_batch(batch);
- timer.done();
-
- // allocate memory
- // This happens AFTER we actually used the memory, but simplifies the whole accounting and we are OK with
- // overshooting a bit. Also this means we either store the whole record batch or not.
- let result = result.and_then(|allocated| {
- self.aggr_state.reservation.try_grow(allocated)
- });
-
- if let Err(e) = result {
- return Poll::Ready(Some(Err(e)));
- }
- }
- // inner had error, return to caller
- Some(Err(e)) => return Poll::Ready(Some(Err(e))),
- // inner is done, switch to producing output
- None => {
- let states = self.aggr_state.ordered_group_states.iter_mut();
- for state in states {
- state.status = GroupStatus::CanEmit;
- }
- self.exec_state = ExecutionState::ProducingOutput;
- }
- }
- }
-
- ExecutionState::ProducingOutput => {
- let timer = elapsed_compute.timer();
- let result = self.create_batch_from_map();
-
- timer.done();
-
- match result {
- // made output
- Ok(Some(result)) => {
- let batch = result.record_output(&self.baseline_metrics);
- self.row_group_skip_position += batch.num_rows();
- // try to read more input
- self.exec_state = ExecutionState::ReadingInput;
- self.prune();
- return Poll::Ready(Some(Ok(batch)));
- }
- // end of output
- Ok(None) => {
- self.exec_state = ExecutionState::Done;
- }
- // error making output
- Err(error) => return Poll::Ready(Some(Err(error))),
- }
- }
- ExecutionState::Done => return Poll::Ready(None),
- }
- }
- }
-}
-
-impl RecordBatchStream for BoundedAggregateStream {
- fn schema(&self) -> SchemaRef {
- self.schema.clone()
- }
-}
-
-/// This utility object encapsulates the row object, the hash and the group
-/// indices for a group. This information is used when executing streaming
-/// GROUP BY calculations.
-struct GroupOrderInfo {
- /// The group by key
- owned_row: OwnedRow,
- /// the hash value of the group
- hash: u64,
- /// the range of row indices in the input batch that belong to this group
- range: Range<usize>,
-}
-
-impl BoundedAggregateStream {
- /// Update the aggr_state hash table according to group_by values
- /// (result of group_by_expressions) when group by expressions are
- /// fully ordered.
- fn update_fully_ordered_group_state(
- &mut self,
- group_values: &[ArrayRef],
- per_group_order_info: Vec<GroupOrderInfo>,
- allocated: &mut usize,
- ) -> Result<Vec<usize>> {
- // 1.1 construct the key from the group values
- // 1.2 construct the mapping key if it does not exist
- // 1.3 add the row' index to `indices`
-
- // track which entries in `aggr_state` have rows in this batch to aggregate
- let mut groups_with_rows = vec![];
-
- let AggregationState {
- map: row_map,
- ordered_group_states,
- ..
- } = &mut self.aggr_state;
-
- for GroupOrderInfo {
- owned_row,
- hash,
- range,
- } in per_group_order_info
- {
- let entry = row_map.get_mut(hash, |(_hash, group_idx)| {
- // verify that a group that we are inserting with hash is
- // actually the same key value as the group in
- // existing_idx (aka group_values @ row)
- let ordered_group_state = &ordered_group_states[*group_idx];
- let group_state = &ordered_group_state.group_state;
- owned_row.row() == group_state.group_by_values.row()
- });
-
- match entry {
- // Existing entry for this group value
- Some((_hash, group_idx)) => {
- let group_state = &mut ordered_group_states[*group_idx].group_state;
-
- // 1.3
- if group_state.indices.is_empty() {
- groups_with_rows.push(*group_idx);
- };
- for row in range.start..range.end {
- // remember this row
- group_state.indices.push_accounted(row as u32, allocated);
- }
- }
- // 1.2 Need to create new entry
- None => {
- let accumulator_set =
- aggregates::create_accumulators(&self.normal_aggr_expr)?;
- // Save the value of the ordering columns as Vec<ScalarValue>
- let row = get_row_at_idx(group_values, range.start)?;
- let ordered_columns = self
- .aggregation_ordering
- .order_indices
- .iter()
- .map(|idx| row[*idx].clone())
- .collect::<Vec<_>>();
-
- // Add new entry to group_states and save newly created index
- let group_state = GroupState {
- group_by_values: owned_row,
- aggregation_buffer: vec![
- 0;
- self.row_aggr_layout.fixed_part_width()
- ],
- accumulator_set,
- indices: (range.start as u32..range.end as u32)
- .collect::<Vec<_>>(), // 1.3
- };
- let group_idx = ordered_group_states.len();
-
- // NOTE: do NOT include the `RowGroupState` struct size in here because this is captured by
- // `group_states` (see allocation down below)
- *allocated += std::mem::size_of_val(&group_state.group_by_values)
- + (std::mem::size_of::<u8>()
- * group_state.aggregation_buffer.capacity())
- + (std::mem::size_of::<u32>() * group_state.indices.capacity());
-
- // Allocation done by normal accumulators
- *allocated += (std::mem::size_of::<Box<dyn Accumulator>>()
- * group_state.accumulator_set.capacity())
- + group_state
- .accumulator_set
- .iter()
- .map(|accu| accu.size())
- .sum::<usize>();
-
- // for hasher function, use precomputed hash value
- row_map.insert_accounted(
- (hash, group_idx),
- |(hash, _group_index)| *hash,
- allocated,
- );
-
- let ordered_group_state = OrderedGroupState {
- group_state,
- ordered_columns,
- status: GroupStatus::GroupInProgress,
- hash,
- };
- ordered_group_states.push_accounted(ordered_group_state, allocated);
-
- groups_with_rows.push(group_idx);
- }
- };
- }
- Ok(groups_with_rows)
- }
-
- // Update the aggr_state according to group_by values (result of group_by_expressions)
- fn update_group_state(
- &mut self,
- group_values: &[ArrayRef],
- allocated: &mut usize,
- ) -> Result<Vec<usize>> {
- // 1.1 construct the key from the group values
- // 1.2 construct the mapping key if it does not exist
- // 1.3 add the row' index to `indices`
-
- // track which entries in `aggr_state` have rows in this batch to aggregate
- let mut groups_with_rows = vec![];
-
- let group_rows = self.row_converter.convert_columns(group_values)?;
- let n_rows = group_rows.num_rows();
- // 1.1 Calculate the group keys for the group values
- let mut batch_hashes = vec![0; n_rows];
- create_hashes(group_values, &self.random_state, &mut batch_hashes)?;
-
- let AggregationState {
- map,
- ordered_group_states: group_states,
- ..
- } = &mut self.aggr_state;
-
- for (row, hash) in batch_hashes.into_iter().enumerate() {
- let entry = map.get_mut(hash, |(_hash, group_idx)| {
- // verify that a group that we are inserting with hash is
- // actually the same key value as the group in
- // existing_idx (aka group_values @ row)
- let group_state = &group_states[*group_idx].group_state;
- group_rows.row(row) == group_state.group_by_values.row()
- });
-
- match entry {
- // Existing entry for this group value
- Some((_hash, group_idx)) => {
- let group_state = &mut group_states[*group_idx].group_state;
-
- // 1.3
- if group_state.indices.is_empty() {
- groups_with_rows.push(*group_idx);
- };
-
- group_state.indices.push_accounted(row as u32, allocated); // remember this row
- }
- // 1.2 Need to create new entry
- None => {
- let accumulator_set =
- aggregates::create_accumulators(&self.normal_aggr_expr)?;
- let row_values = get_row_at_idx(group_values, row)?;
- let ordered_columns = self
- .aggregation_ordering
- .order_indices
- .iter()
- .map(|idx| row_values[*idx].clone())
- .collect::<Vec<_>>();
- let group_state = GroupState {
- group_by_values: group_rows.row(row).owned(),
- aggregation_buffer: vec![
- 0;
- self.row_aggr_layout.fixed_part_width()
- ],
- accumulator_set,
- indices: vec![row as u32], // 1.3
- };
- let group_idx = group_states.len();
-
- // NOTE: do NOT include the `GroupState` struct size in here because this is captured by
- // `group_states` (see allocation down below)
- *allocated += std::mem::size_of_val(&group_state.group_by_values)
- + (std::mem::size_of::<u8>()
- * group_state.aggregation_buffer.capacity())
- + (std::mem::size_of::<u32>() * group_state.indices.capacity());
-
- // Allocation done by normal accumulators
- *allocated += (std::mem::size_of::<Box<dyn Accumulator>>()
- * group_state.accumulator_set.capacity())
- + group_state
- .accumulator_set
- .iter()
- .map(|accu| accu.size())
- .sum::<usize>();
-
- // for hasher function, use precomputed hash value
- map.insert_accounted(
- (hash, group_idx),
- |(hash, _group_index)| *hash,
- allocated,
- );
-
- // Add new entry to group_states and save newly created index
- let ordered_group_state = OrderedGroupState {
- group_state,
- ordered_columns,
- status: GroupStatus::GroupInProgress,
- hash,
- };
- group_states.push_accounted(ordered_group_state, allocated);
-
- groups_with_rows.push(group_idx);
- }
- };
- }
- Ok(groups_with_rows)
- }
-
- // Update the accumulator results, according to aggr_state.
- #[allow(clippy::too_many_arguments)]
- fn update_accumulators_using_batch(
- &mut self,
- groups_with_rows: &[usize],
- offsets: &[usize],
- row_values: &[Vec<ArrayRef>],
- normal_values: &[Vec<ArrayRef>],
- row_filter_values: &[Option<ArrayRef>],
- normal_filter_values: &[Option<ArrayRef>],
- allocated: &mut usize,
- ) -> Result<()> {
- // 2.1 for each key in this batch
- // 2.2 for each aggregation
- // 2.3 `slice` from each of its arrays the keys' values
- // 2.4 update / merge the accumulator with the values
- // 2.5 clear indices
- groups_with_rows
- .iter()
- .zip(offsets.windows(2))
- .try_for_each(|(group_idx, offsets)| {
- let group_state =
- &mut self.aggr_state.ordered_group_states[*group_idx].group_state;
- // 2.2
- // Process row accumulators
- self.row_accumulators
- .iter_mut()
- .zip(row_values.iter())
- .zip(row_filter_values.iter())
- .try_for_each(|((accumulator, aggr_array), filter_opt)| {
- let values = slice_and_maybe_filter(
- aggr_array,
- filter_opt.as_ref(),
- offsets,
- )?;
- let mut state_accessor =
- RowAccessor::new_from_layout(self.row_aggr_layout.clone());
- state_accessor
- .point_to(0, group_state.aggregation_buffer.as_mut_slice());
- match self.mode {
- AggregateMode::Partial
- | AggregateMode::Single
- | AggregateMode::SinglePartitioned => {
- accumulator.update_batch(&values, &mut state_accessor)
- }
- AggregateMode::FinalPartitioned | AggregateMode::Final => {
- // note: the aggregation here is over states, not values, thus the merge
- accumulator.merge_batch(&values, &mut state_accessor)
- }
- }
- })?;
- // normal accumulators
- group_state
- .accumulator_set
- .iter_mut()
- .zip(normal_values.iter())
- .zip(normal_filter_values.iter())
- .try_for_each(|((accumulator, aggr_array), filter_opt)| {
- let values = slice_and_maybe_filter(
- aggr_array,
- filter_opt.as_ref(),
- offsets,
- )?;
- let size_pre = accumulator.size();
- let res = match self.mode {
- AggregateMode::Partial
- | AggregateMode::Single
- | AggregateMode::SinglePartitioned => {
- accumulator.update_batch(&values)
- }
- AggregateMode::FinalPartitioned | AggregateMode::Final => {
- // note: the aggregation here is over states, not values, thus the merge
- accumulator.merge_batch(&values)
- }
- };
- let size_post = accumulator.size();
- *allocated += size_post.saturating_sub(size_pre);
- res
- })
- // 2.5
- .and({
- group_state.indices.clear();
- Ok(())
- })
- })?;
- Ok(())
- }
-
- // Update the accumulator results, according to aggr_state.
- fn update_accumulators_using_scalar(
- &mut self,
- groups_with_rows: &[usize],
- row_values: &[Vec<ArrayRef>],
- row_filter_values: &[Option<ArrayRef>],
- ) -> Result<()> {
- let filter_bool_array = row_filter_values
- .iter()
- .map(|filter_opt| match filter_opt {
- Some(f) => Ok(Some(as_boolean_array(f)?)),
- None => Ok(None),
- })
- .collect::<Result<Vec<_>>>()?;
-
- for group_idx in groups_with_rows {
- let group_state =
- &mut self.aggr_state.ordered_group_states[*group_idx].group_state;
- let mut state_accessor =
- RowAccessor::new_from_layout(self.row_aggr_layout.clone());
- state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice());
- for idx in &group_state.indices {
- for (accumulator, values_array, filter_array) in izip!(
- self.row_accumulators.iter_mut(),
- row_values.iter(),
- filter_bool_array.iter()
- ) {
- if values_array.len() == 1 {
- let scalar_value =
- col_to_scalar(&values_array[0], filter_array, *idx as usize)?;
- accumulator.update_scalar(&scalar_value, &mut state_accessor)?;
- } else {
- let scalar_values = values_array
- .iter()
- .map(|array| {
- col_to_scalar(array, filter_array, *idx as usize)
- })
- .collect::<Result<Vec<_>>>()?;
- accumulator
- .update_scalar_values(&scalar_values, &mut state_accessor)?;
- }
- }
- }
- // clear the group indices in this group
- group_state.indices.clear();
- }
-
- Ok(())
- }
-
- /// Perform group-by aggregation for the given [`RecordBatch`].
- ///
- /// If successful, this returns the additional number of bytes that were allocated during this process.
- ///
- fn group_aggregate_batch(&mut self, batch: RecordBatch) -> Result<usize> {
- // Evaluate the grouping expressions:
- let group_by_values = evaluate_group_by(&self.group_by, &batch)?;
- // Keep track of memory allocated:
- let mut allocated = 0usize;
-
- // Evaluate the aggregation expressions.
- // We could evaluate them after the `take`, but since we need to evaluate all
- // of them anyways, it is more performant to do it while they are together.
- let row_aggr_input_values =
- evaluate_many(&self.row_aggregate_expressions, &batch)?;
- let normal_aggr_input_values =
- evaluate_many(&self.normal_aggregate_expressions, &batch)?;
- let row_filter_values = evaluate_optional(&self.row_filter_expressions, &batch)?;
- let normal_filter_values =
- evaluate_optional(&self.normal_filter_expressions, &batch)?;
-
- let row_converter_size_pre = self.row_converter.size();
- for group_values in &group_by_values {
- // If the input is fully sorted on its grouping keys
- let groups_with_rows = if let AggregationOrdering {
- mode: GroupByOrderMode::FullyOrdered,
- order_indices,
- ordering,
- } = &self.aggregation_ordering
- {
- let group_rows = self.row_converter.convert_columns(group_values)?;
- let n_rows = group_rows.num_rows();
- // 1.1 Calculate the group keys for the group values
- let mut batch_hashes = vec![0; n_rows];
- create_hashes(group_values, &self.random_state, &mut batch_hashes)?;
- let sort_column = order_indices
- .iter()
- .enumerate()
- .map(|(idx, cur_idx)| SortColumn {
- values: group_values[*cur_idx].clone(),
- options: Some(ordering[idx].options),
- })
- .collect::<Vec<_>>();
- let n_rows = group_rows.num_rows();
- // determine the boundaries between groups
- let ranges = evaluate_partition_ranges(n_rows, &sort_column)?;
- let per_group_order_info = ranges
- .into_iter()
- .map(|range| GroupOrderInfo {
- owned_row: group_rows.row(range.start).owned(),
- hash: batch_hashes[range.start],
- range,
- })
- .collect::<Vec<_>>();
- self.update_fully_ordered_group_state(
- group_values,
- per_group_order_info,
- &mut allocated,
- )?
- } else {
- self.update_group_state(group_values, &mut allocated)?
- };
-
- // Decide the accumulators update mode, use scalar value to update the accumulators when all of the conditions are meet:
- // 1) The aggregation mode is Partial or Single
- // 2) There is not normal aggregation expressions
- // 3) The number of affected groups is high (entries in `aggr_state` have rows need to update). Usually the high cardinality case
- if matches!(self.mode, AggregateMode::Partial | AggregateMode::Single)
- && normal_aggr_input_values.is_empty()
- && normal_filter_values.is_empty()
- && groups_with_rows.len() >= batch.num_rows() / self.scalar_update_factor
- {
- self.update_accumulators_using_scalar(
- &groups_with_rows,
- &row_aggr_input_values,
- &row_filter_values,
- )?;
- } else {
- // Collect all indices + offsets based on keys in this vec
- let mut batch_indices = UInt32Builder::with_capacity(0);
- let mut offsets = vec![0];
- let mut offset_so_far = 0;
- for &group_idx in groups_with_rows.iter() {
- let indices = &self.aggr_state.ordered_group_states[group_idx]
- .group_state
- .indices;
- batch_indices.append_slice(indices);
- offset_so_far += indices.len();
- offsets.push(offset_so_far);
- }
- let batch_indices = batch_indices.finish();
-
- let row_filter_values =
- get_optional_filters(&row_filter_values, &batch_indices);
- let normal_filter_values =
- get_optional_filters(&normal_filter_values, &batch_indices);
- if self.aggregation_ordering.mode == GroupByOrderMode::FullyOrdered {
- self.update_accumulators_using_batch(
- &groups_with_rows,
- &offsets,
- &row_aggr_input_values,
- &normal_aggr_input_values,
- &row_filter_values,
- &normal_filter_values,
- &mut allocated,
- )?;
- } else {
- let row_values =
- get_at_indices(&row_aggr_input_values, &batch_indices)?;
- let normal_values =
- get_at_indices(&normal_aggr_input_values, &batch_indices)?;
- self.update_accumulators_using_batch(
- &groups_with_rows,
- &offsets,
- &row_values,
- &normal_values,
- &row_filter_values,
- &normal_filter_values,
- &mut allocated,
- )?;
- };
- }
- }
- allocated += self
- .row_converter
- .size()
- .saturating_sub(row_converter_size_pre);
-
- let mut new_result = false;
- let last_ordered_columns = self
- .aggr_state
- .ordered_group_states
- .last()
- .map(|item| item.ordered_columns.clone());
-
- if let Some(last_ordered_columns) = last_ordered_columns {
- for cur_group in &mut self.aggr_state.ordered_group_states {
- if cur_group.ordered_columns != last_ordered_columns {
- // We will no longer receive value. Set status to GroupStatus::CanEmit
- // meaning we can generate result for this group.
- cur_group.status = GroupStatus::CanEmit;
- new_result = true;
- }
- }
- }
- if new_result {
- self.exec_state = ExecutionState::ProducingOutput;
- }
-
- Ok(allocated)
- }
-}
-
-/// Tracks the state of the ordered grouping
-#[derive(Debug, PartialEq)]
-enum GroupStatus {
- /// Data for current group is not complete, and new data may yet
- /// arrive.
- GroupInProgress,
- /// Data for current group is completed, and its result can emitted.
- CanEmit,
- /// Result for the groups has been successfully emitted, and group
- /// state can be pruned.
- Emitted,
-}
-
-/// Information about the order of the state that is built for each
-/// output group.
-#[derive(Debug)]
-pub struct OrderedGroupState {
- /// Aggregate values
- group_state: GroupState,
- /// The actual value of the ordered columns for this group
- ordered_columns: Vec<ScalarValue>,
- /// Can we emit this group?
- status: GroupStatus,
- /// Hash value of the group
- hash: u64,
-}
-
-/// The state of all the groups
-pub struct AggregationState {
- pub reservation: MemoryReservation,
-
- /// Logically maps group values to an index in `group_states`
- ///
- /// Uses the raw API of hashbrown to avoid actually storing the
- /// keys in the table
- ///
- /// keys: u64 hashes of the GroupValue
- /// values: (hash, index into `group_states`)
- pub map: RawTable<(u64, usize)>,
-
- /// State for each group
- pub ordered_group_states: Vec<OrderedGroupState>,
-}
-
-impl std::fmt::Debug for AggregationState {
- fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- // hashes are not store inline, so could only get values
- let map_string = "RawTable";
- f.debug_struct("AggregationState")
- .field("map", &map_string)
- .field("ordered_group_states", &self.ordered_group_states)
- .finish()
- }
-}
-
-impl BoundedAggregateStream {
- /// Prune the groups from `[Self::ordered_group_states]` which are in
- /// [`GroupStatus::Emitted`].
- ///
- /// Emitted means that the result of this group has already been
- /// emitted, and we are sure that these groups can not receive new
- /// rows.
- fn prune(&mut self) {
- // clear out emitted groups
- let n_partition = self.aggr_state.ordered_group_states.len();
- self.aggr_state
- .ordered_group_states
- .retain(|elem| elem.status != GroupStatus::Emitted);
-
- let n_partition_new = self.aggr_state.ordered_group_states.len();
- let n_pruned = n_partition - n_partition_new;
-
- // update hash table with the new indexes of the remaining groups
- self.aggr_state.map.clear();
- for (idx, item) in self.aggr_state.ordered_group_states.iter().enumerate() {
- self.aggr_state
- .map
- .insert(item.hash, (item.hash, idx), |(hash, _)| *hash);
- }
- self.row_group_skip_position -= n_pruned;
- }
-
- /// Create a RecordBatch with all group keys and accumulator' states or values.
- fn create_batch_from_map(&mut self) -> Result<Option<RecordBatch>> {
- let skip_items = self.row_group_skip_position;
- if skip_items > self.aggr_state.ordered_group_states.len() || self.is_end {
- return Ok(None);
- }
- self.is_end |= skip_items == self.aggr_state.ordered_group_states.len();
- if self.aggr_state.ordered_group_states.is_empty() {
- let schema = self.schema.clone();
- return Ok(Some(RecordBatch::new_empty(schema)));
- }
-
- let end_idx = min(
- skip_items + self.batch_size,
- self.aggr_state.ordered_group_states.len(),
- );
- let group_state_chunk =
- &self.aggr_state.ordered_group_states[skip_items..end_idx];
-
- // Consider only the groups that can be emitted. (The ones we
- // are sure that will not receive new entry.)
- let group_state_chunk = group_state_chunk
- .iter()
- .filter(|item| item.status == GroupStatus::CanEmit)
- .collect::<Vec<_>>();
-
- if group_state_chunk.is_empty() {
- let schema = self.schema.clone();
- return Ok(Some(RecordBatch::new_empty(schema)));
- }
-
- // Buffers for each distinct group (i.e. row accumulator memories)
- let mut state_buffers = group_state_chunk
- .iter()
- .map(|gs| gs.group_state.aggregation_buffer.clone())
- .collect::<Vec<_>>();
-
- let output_fields = self.schema.fields();
- // Store row accumulator results (either final output or intermediate state):
- let row_columns = match self.mode {
- AggregateMode::Partial => {
- read_as_batch(&state_buffers, &self.row_aggr_schema)
- }
- AggregateMode::Final
- | AggregateMode::FinalPartitioned
- | AggregateMode::Single
- | AggregateMode::SinglePartitioned => {
- let mut results = vec![];
- for (idx, acc) in self.row_accumulators.iter().enumerate() {
- let mut state_accessor = RowAccessor::new(&self.row_aggr_schema);
- let current = state_buffers
- .iter_mut()
- .map(|buffer| {
- state_accessor.point_to(0, buffer);
- acc.evaluate(&state_accessor)
- })
- .collect::<Result<Vec<_>>>()?;
- // Get corresponding field for row accumulator
- let field = &output_fields[self.indices[1][idx].start];
- let result = if current.is_empty() {
- Ok(arrow::array::new_empty_array(field.data_type()))
- } else {
- let item = ScalarValue::iter_to_array(current)?;
- // cast output if needed (e.g. for types like Dictionary where
- // the intermediate GroupByScalar type was not the same as the
- // output
- cast(&item, field.data_type())
- }?;
- results.push(result);
- }
- results
- }
- };
-
- // Store normal accumulator results (either final output or intermediate state):
- let mut columns = vec![];
- for (idx, &Range { start, end }) in self.indices[0].iter().enumerate() {
- for (field_idx, field) in output_fields[start..end].iter().enumerate() {
- let current = match self.mode {
- AggregateMode::Partial => ScalarValue::iter_to_array(
- group_state_chunk.iter().map(|group_state| {
- group_state.group_state.accumulator_set[idx]
- .state()
- .map(|v| v[field_idx].clone())
- .expect("Unexpected accumulator state in hash aggregate")
- }),
- ),
- AggregateMode::Final
- | AggregateMode::FinalPartitioned
- | AggregateMode::Single
- | AggregateMode::SinglePartitioned => ScalarValue::iter_to_array(
- group_state_chunk.iter().map(|group_state| {
- group_state.group_state.accumulator_set[idx]
- .evaluate()
- .expect("Unexpected accumulator state in hash aggregate")
- }),
- ),
- }?;
- // Cast output if needed (e.g. for types like Dictionary where
- // the intermediate GroupByScalar type was not the same as the
- // output
- let result = cast(¤t, field.data_type())?;
- columns.push(result);
- }
- }
-
- // Stores the group by fields
- let group_buffers = group_state_chunk
- .iter()
- .map(|gs| gs.group_state.group_by_values.row())
- .collect::<Vec<_>>();
- let mut output: Vec<ArrayRef> = self.row_converter.convert_rows(group_buffers)?;
-
- // The size of the place occupied by row and normal accumulators
- let extra: usize = self
- .indices
- .iter()
- .flatten()
- .map(|Range { start, end }| end - start)
- .sum();
- let empty_arr = new_null_array(&DataType::Null, 1);
- output.extend(std::iter::repeat(empty_arr).take(extra));
-
- // Write results of both accumulator types to the corresponding location in
- // the output schema:
- let results = [columns.into_iter(), row_columns.into_iter()];
- for (outer, mut current) in results.into_iter().enumerate() {
- for &Range { start, end } in self.indices[outer].iter() {
- for item in output.iter_mut().take(end).skip(start) {
- *item = current.next().expect("Columns cannot be empty");
- }
- }
- }
-
- // Set status of the emitted groups to GroupStatus::Emitted mode.
- for gs in self.aggr_state.ordered_group_states[skip_items..end_idx].iter_mut() {
- if gs.status == GroupStatus::CanEmit {
- gs.status = GroupStatus::Emitted;
- }
- }
-
- Ok(Some(RecordBatch::try_new(self.schema.clone(), output)?))
- }
-}
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs
index b406320dda..9d8ced18da 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -18,8 +18,7 @@
//! Aggregates functionalities
use crate::physical_plan::aggregates::{
- bounded_aggregate_stream::BoundedAggregateStream, no_grouping::AggregateStream,
- row_hash::GroupedHashAggregateStream,
+ no_grouping::AggregateStream, row_hash::GroupedHashAggregateStream,
};
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
@@ -46,10 +45,9 @@ use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
-mod bounded_aggregate_stream;
mod no_grouping;
+mod order;
mod row_hash;
-mod utils;
pub use datafusion_expr::AggregateFunction;
use datafusion_physical_expr::aggregate::is_order_sensitive;
@@ -95,7 +93,7 @@ pub enum AggregateMode {
/// Specifically, each distinct combination of the relevant columns
/// are contiguous in the input, and once a new combination is seen
/// previous combinations are guaranteed never to appear again
-#[derive(Debug, Clone, PartialEq, Eq)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum GroupByOrderMode {
/// The input is not (known to be) ordered by any of the
/// expressions in the GROUP BY clause.
@@ -218,7 +216,6 @@ impl PartialEq for PhysicalGroupBy {
enum StreamType {
AggregateStream(AggregateStream),
GroupedHashAggregateStream(GroupedHashAggregateStream),
- BoundedAggregate(BoundedAggregateStream),
}
impl From<StreamType> for SendableRecordBatchStream {
@@ -226,7 +223,6 @@ impl From<StreamType> for SendableRecordBatchStream {
match stream {
StreamType::AggregateStream(stream) => Box::pin(stream),
StreamType::GroupedHashAggregateStream(stream) => Box::pin(stream),
- StreamType::BoundedAggregate(stream) => Box::pin(stream),
}
}
}
@@ -725,14 +721,6 @@ impl AggregateExec {
Ok(StreamType::AggregateStream(AggregateStream::new(
self, context, partition,
)?))
- } else if let Some(aggregation_ordering) = &self.aggregation_ordering {
- let aggregation_ordering = aggregation_ordering.clone();
- Ok(StreamType::BoundedAggregate(BoundedAggregateStream::new(
- self,
- context,
- partition,
- aggregation_ordering,
- )?))
} else {
Ok(StreamType::GroupedHashAggregateStream(
GroupedHashAggregateStream::new(self, context, partition)?,
@@ -1116,6 +1104,7 @@ fn create_accumulators(
.collect::<Result<Vec<_>>>()
}
+#[allow(dead_code)]
fn create_row_accumulators(
aggr_expr: &[Arc<dyn AggregateExpr>],
) -> Result<Vec<RowAccumulatorItem>> {
diff --git a/datafusion/core/src/physical_plan/aggregates/order/full.rs b/datafusion/core/src/physical_plan/aggregates/order/full.rs
new file mode 100644
index 0000000000..d95433a998
--- /dev/null
+++ b/datafusion/core/src/physical_plan/aggregates/order/full.rs
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_execution::memory_pool::proxy::VecAllocExt;
+
+use crate::physical_expr::EmitTo;
+
+/// Tracks grouping state when the data is ordered entirely by its
+/// group keys
+///
+/// When the group values are sorted, as soon as we see group `n+1` we
+/// know we will never see any rows for group `n again and thus they
+/// can be emitted.
+///
+/// For example, given `SUM(amt) GROUP BY id` if the input is sorted
+/// by `id` as soon as a new `id` value is seen all previous values
+/// can be emitted.
+///
+/// The state is tracked like this:
+///
+/// ```text
+/// ┌─────┐ ┌──────────────────┐
+/// │┌───┐│ │ ┌──────────────┐ │ ┏━━━━━━━━━━━━━━┓
+/// ││ 0 ││ │ │ 123 │ │ ┌─────┃ 13 ┃
+/// │└───┘│ │ └──────────────┘ │ │ ┗━━━━━━━━━━━━━━┛
+/// │ ... │ │ ... │ │
+/// │┌───┐│ │ ┌──────────────┐ │ │ current
+/// ││12 ││ │ │ 234 │ │ │
+/// │├───┤│ │ ├──────────────┤ │ │
+/// ││12 ││ │ │ 234 │ │ │
+/// │├───┤│ │ ├──────────────┤ │ │
+/// ││13 ││ │ │ 456 │◀┼───┘
+/// │└───┘│ │ └──────────────┘ │
+/// └─────┘ └──────────────────┘
+///
+/// group indices group_values current tracks the most
+/// (in group value recent group index
+/// order)
+/// ```
+///
+/// In this diagram, the current group is `13`, and thus groups
+/// `0..12` can be emitted. Note that `13` can not yet be emitted as
+/// there may be more values in the next batch with the same group_id.
+#[derive(Debug)]
+pub(crate) struct GroupOrderingFull {
+ state: State,
+ /// Hash values for groups in 0..current
+ hashes: Vec<u64>,
+}
+
+#[derive(Debug)]
+enum State {
+ /// Seen no input yet
+ Start,
+
+ /// Data is in progress. `current is the current group for which
+ /// values are being generated. Can emit `current` - 1
+ InProgress { current: usize },
+
+ /// Seen end of input: all groups can be emitted
+ Complete,
+}
+
+impl GroupOrderingFull {
+ pub fn new() -> Self {
+ Self {
+ state: State::Start,
+ hashes: vec![],
+ }
+ }
+
+ // How many groups be emitted, or None if no data can be emitted
+ pub fn emit_to(&self) -> Option<EmitTo> {
+ match &self.state {
+ State::Start => None,
+ State::InProgress { current, .. } => {
+ if *current == 0 {
+ // Can not emit if still on the first row
+ None
+ } else {
+ // otherwise emit all rows prior to the current group
+ Some(EmitTo::First(*current))
+ }
+ }
+ State::Complete { .. } => Some(EmitTo::All),
+ }
+ }
+
+ /// remove the first n groups from the internal state, shifting
+ /// all existing indexes down by `n`. Returns stored hash values
+ pub fn remove_groups(&mut self, n: usize) -> &[u64] {
+ match &mut self.state {
+ State::Start => panic!("invalid state: start"),
+ State::InProgress { current } => {
+ // shift down by n
+ assert!(*current >= n);
+ *current -= n;
+ self.hashes.drain(0..n);
+ }
+ State::Complete { .. } => panic!("invalid state: complete"),
+ };
+ &self.hashes
+ }
+
+ /// Note that the input is complete so any outstanding groups are done as well
+ pub fn input_done(&mut self) {
+ self.state = State::Complete;
+ }
+
+ /// Called when new groups are added in a batch. See documentation
+ /// on [`super::GroupOrdering::new_groups`]
+ pub fn new_groups(
+ &mut self,
+ group_indices: &[usize],
+ batch_hashes: &[u64],
+ total_num_groups: usize,
+ ) {
+ assert_ne!(total_num_groups, 0);
+ assert_eq!(group_indices.len(), batch_hashes.len());
+
+ // copy any hash values
+ self.hashes.resize(total_num_groups, 0);
+ for (&group_index, &hash) in group_indices.iter().zip(batch_hashes.iter()) {
+ self.hashes[group_index] = hash;
+ }
+
+ // Update state
+ let max_group_index = total_num_groups - 1;
+ self.state = match self.state {
+ State::Start => State::InProgress {
+ current: max_group_index,
+ },
+ State::InProgress { current } => {
+ // expect to see new group indexes when called again
+ assert!(current <= max_group_index, "{current} <= {max_group_index}");
+ State::InProgress {
+ current: max_group_index,
+ }
+ }
+ State::Complete { .. } => {
+ panic!("Saw new group after input was complete");
+ }
+ };
+ }
+
+ pub(crate) fn size(&self) -> usize {
+ std::mem::size_of::<Self>() + self.hashes.allocated_size()
+ }
+}
diff --git a/datafusion/core/src/physical_plan/aggregates/order/mod.rs b/datafusion/core/src/physical_plan/aggregates/order/mod.rs
new file mode 100644
index 0000000000..4e1da35319
--- /dev/null
+++ b/datafusion/core/src/physical_plan/aggregates/order/mod.rs
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_array::ArrayRef;
+use arrow_schema::Schema;
+use datafusion_common::Result;
+use datafusion_physical_expr::EmitTo;
+
+use super::{AggregationOrdering, GroupByOrderMode};
+
+mod full;
+mod partial;
+
+pub(crate) use full::GroupOrderingFull;
+pub(crate) use partial::GroupOrderingPartial;
+
+/// Ordering information for each group in the hash table
+#[derive(Debug)]
+pub(crate) enum GroupOrdering {
+ /// Groups are not ordered
+ None,
+ /// Groups are ordered by some pre-set of the group keys
+ Partial(GroupOrderingPartial),
+ /// Groups are entirely contiguous,
+ Full(GroupOrderingFull),
+}
+
+impl GroupOrdering {
+ /// Create a `GroupOrdering` for the the specified ordering
+ pub fn try_new(
+ input_schema: &Schema,
+ ordering: &AggregationOrdering,
+ ) -> Result<Self> {
+ let AggregationOrdering {
+ mode,
+ order_indices,
+ ordering,
+ } = ordering;
+
+ Ok(match mode {
+ GroupByOrderMode::None => GroupOrdering::None,
+ GroupByOrderMode::PartiallyOrdered => {
+ let partial =
+ GroupOrderingPartial::try_new(input_schema, order_indices, ordering)?;
+ GroupOrdering::Partial(partial)
+ }
+ GroupByOrderMode::FullyOrdered => {
+ GroupOrdering::Full(GroupOrderingFull::new())
+ }
+ })
+ }
+
+ // How many groups be emitted, or None if no data can be emitted
+ pub fn emit_to(&self) -> Option<EmitTo> {
+ match self {
+ GroupOrdering::None => None,
+ GroupOrdering::Partial(partial) => partial.emit_to(),
+ GroupOrdering::Full(full) => full.emit_to(),
+ }
+ }
+
+ /// Updates the state the input is done
+ pub fn input_done(&mut self) {
+ match self {
+ GroupOrdering::None => {}
+ GroupOrdering::Partial(partial) => partial.input_done(),
+ GroupOrdering::Full(full) => full.input_done(),
+ }
+ }
+
+ /// remove the first n groups from the internal state, shifting
+ /// all existing indexes down by `n`. Returns stored hash values
+ pub fn remove_groups(&mut self, n: usize) -> &[u64] {
+ match self {
+ GroupOrdering::None => &[],
+ GroupOrdering::Partial(partial) => partial.remove_groups(n),
+ GroupOrdering::Full(full) => full.remove_groups(n),
+ }
+ }
+
+ /// Called when new groups are added in a batch
+ ///
+ /// * `total_num_groups`: total number of groups (so max
+ /// group_index is total_num_groups - 1).
+ ///
+ /// * `group_values`: group key values for *each row* in the batch
+ ///
+ /// * `group_indices`: indices for each row in the batch
+ ///
+ /// * `hashes`: hash values for each row in the batch
+ pub fn new_groups(
+ &mut self,
+ batch_group_values: &[ArrayRef],
+ group_indices: &[usize],
+ batch_hashes: &[u64],
+ total_num_groups: usize,
+ ) -> Result<()> {
+ match self {
+ GroupOrdering::None => {}
+ GroupOrdering::Partial(partial) => {
+ partial.new_groups(
+ batch_group_values,
+ group_indices,
+ batch_hashes,
+ total_num_groups,
+ )?;
+ }
+
+ GroupOrdering::Full(full) => {
+ full.new_groups(group_indices, batch_hashes, total_num_groups);
+ }
+ };
+ Ok(())
+ }
+
+ /// Return the size of memory used by the ordering state, in bytes
+ pub(crate) fn size(&self) -> usize {
+ std::mem::size_of::<Self>()
+ + match self {
+ GroupOrdering::None => 0,
+ GroupOrdering::Partial(partial) => partial.size(),
+ GroupOrdering::Full(full) => full.size(),
+ }
+ }
+}
diff --git a/datafusion/core/src/physical_plan/aggregates/order/partial.rs b/datafusion/core/src/physical_plan/aggregates/order/partial.rs
new file mode 100644
index 0000000000..be8cd59671
--- /dev/null
+++ b/datafusion/core/src/physical_plan/aggregates/order/partial.rs
@@ -0,0 +1,267 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::physical_expr::EmitTo;
+use arrow::row::{OwnedRow, RowConverter, Rows, SortField};
+use arrow_array::ArrayRef;
+use arrow_schema::Schema;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::proxy::VecAllocExt;
+use datafusion_physical_expr::PhysicalSortExpr;
+
+/// Tracks grouping state when the data is ordered by some subset of
+/// the group keys.
+///
+/// Once the next *sort key* value is seen, never see groups with that
+/// sort key again, so we can emit all groups with the previous sort
+/// key and earlier.
+///
+/// For example, given `SUM(amt) GROUP BY id, state` if the input is
+/// sorted by `state, when a new value of `state` is seen, all groups
+/// with prior values of `state` can be emitted.
+///
+/// The state is tracked like this:
+///
+/// ```text
+/// ┏━━━━━━━━━━━━━━━━━┓ ┏━━━━━━━┓
+/// ┌─────┐ ┌───────────────────┐ ┌─────┃ 9 ┃ ┃ "MD" ┃
+/// │┌───┐│ │ ┌──────────────┐ │ │ ┗━━━━━━━━━━━━━━━━━┛ ┗━━━━━━━┛
+/// ││ 0 ││ │ │ 123, "MA" │ │ │ current_sort sort_key
+/// │└───┘│ │ └──────────────┘ │ │
+/// │ ... │ │ ... │ │ current_sort tracks the
+/// │┌───┐│ │ ┌──────────────┐ │ │ smallest group index that had
+/// ││ 8 ││ │ │ 765, "MA" │ │ │ the same sort_key as current
+/// │├───┤│ │ ├──────────────┤ │ │
+/// ││ 9 ││ │ │ 923, "MD" │◀─┼─┘
+/// │├───┤│ │ ├──────────────┤ │ ┏━━━━━━━━━━━━━━┓
+/// ││10 ││ │ │ 345, "MD" │ │ ┌─────┃ 11 ┃
+/// │├───┤│ │ ├──────────────┤ │ │ ┗━━━━━━━━━━━━━━┛
+/// ││11 ││ │ │ 124, "MD" │◀─┼──┘ current
+/// │└───┘│ │ └──────────────┘ │
+/// └─────┘ └───────────────────┘
+///
+/// group indices
+/// (in group value group_values current tracks the most
+/// order) recent group index
+///```
+#[derive(Debug)]
+pub(crate) struct GroupOrderingPartial {
+ /// State machine
+ state: State,
+
+ /// The indexes of the group by columns that form the sort key.
+ /// For example if grouping by `id, state` and ordered by `state`
+ /// this would be `[1]`.
+ order_indices: Vec<usize>,
+
+ /// Converter for the sort key (used on the group columns
+ /// specified in `order_indexes`)
+ row_converter: RowConverter,
+
+ /// Hash values for groups in 0..completed
+ hashes: Vec<u64>,
+}
+
+#[derive(Debug, Default)]
+enum State {
+ /// The ordering was temporarily taken. `Self::Taken` is left
+ /// when state must be temporarily taken to satisfy the borrow
+ /// checker. If an error happens before the state can be restored,
+ /// the ordering information is lost and execution can not
+ /// proceed, but there is no undefined behavior.
+ #[default]
+ Taken,
+
+ /// Seen no input yet
+ Start,
+
+ /// Data is in progress.
+ InProgress {
+ /// Smallest group index with the sort_key
+ current_sort: usize,
+ /// The sort key of group_index `current_sort`
+ sort_key: OwnedRow,
+ /// index of the current group for which values are being
+ /// generated
+ current: usize,
+ },
+
+ /// Seen end of input, all groups can be emitted
+ Complete,
+}
+
+impl GroupOrderingPartial {
+ pub fn try_new(
+ input_schema: &Schema,
+ order_indices: &[usize],
+ ordering: &[PhysicalSortExpr],
+ ) -> Result<Self> {
+ assert!(!order_indices.is_empty());
+ assert_eq!(order_indices.len(), ordering.len());
+
+ let fields = ordering
+ .iter()
+ .map(|sort_expr| {
+ Ok(SortField::new_with_options(
+ sort_expr.expr.data_type(input_schema)?,
+ sort_expr.options,
+ ))
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ Ok(Self {
+ state: State::Start,
+ order_indices: order_indices.to_vec(),
+ row_converter: RowConverter::new(fields)?,
+ hashes: vec![],
+ })
+ }
+
+ /// Creates sort keys from the group values
+ ///
+ /// For example, if group_values had `A, B, C` but the input was
+ /// only sorted on `B` and `C` this should return rows for (`B`,
+ /// `C`)
+ fn compute_sort_keys(&mut self, group_values: &[ArrayRef]) -> Result<Rows> {
+ // Take only the columns that are in the sort key
+ let sort_values: Vec<_> = self
+ .order_indices
+ .iter()
+ .map(|&idx| group_values[idx].clone())
+ .collect();
+
+ Ok(self.row_converter.convert_columns(&sort_values)?)
+ }
+
+ /// How many groups be emitted, or None if no data can be emitted
+ pub fn emit_to(&self) -> Option<EmitTo> {
+ match &self.state {
+ State::Taken => unreachable!("State previously taken"),
+ State::Start => None,
+ State::InProgress { current_sort, .. } => {
+ // Can not emit if we are still on the first row sort
+ // row otherwise we can emit all groups that had earlier sort keys
+ //
+ if *current_sort == 0 {
+ None
+ } else {
+ Some(EmitTo::First(*current_sort))
+ }
+ }
+ State::Complete => Some(EmitTo::All),
+ }
+ }
+
+ /// remove the first n groups from the internal state, shifting
+ /// all existing indexes down by `n`. Returns stored hash values
+ pub fn remove_groups(&mut self, n: usize) -> &[u64] {
+ match &mut self.state {
+ State::Taken => unreachable!("State previously taken"),
+ State::Start => panic!("invalid state: start"),
+ State::InProgress {
+ current_sort,
+ current,
+ sort_key: _,
+ } => {
+ // shift indexes down by n
+ assert!(*current >= n);
+ *current -= n;
+ assert!(*current_sort >= n);
+ *current_sort -= n;
+ // Note sort_key stays the same, we are just translating group indexes
+ self.hashes.drain(0..n);
+ }
+ State::Complete { .. } => panic!("invalid state: complete"),
+ };
+ &self.hashes
+ }
+
+ /// Note that the input is complete so any outstanding groups are done as well
+ pub fn input_done(&mut self) {
+ self.state = match self.state {
+ State::Taken => unreachable!("State previously taken"),
+ _ => State::Complete,
+ };
+ }
+
+ /// Called when new groups are added in a batch. See documentation
+ /// on [`super::GroupOrdering::new_groups`]
+ pub fn new_groups(
+ &mut self,
+ batch_group_values: &[ArrayRef],
+ group_indices: &[usize],
+ batch_hashes: &[u64],
+ total_num_groups: usize,
+ ) -> Result<()> {
+ assert!(total_num_groups > 0);
+ assert!(!batch_group_values.is_empty());
+ assert_eq!(group_indices.len(), batch_hashes.len());
+
+ let max_group_index = total_num_groups - 1;
+
+ // compute the sort key values for each group
+ let sort_keys = self.compute_sort_keys(batch_group_values)?;
+ assert_eq!(sort_keys.num_rows(), batch_hashes.len());
+
+ let old_state = std::mem::take(&mut self.state);
+ let (mut current_sort, mut sort_key) = match &old_state {
+ State::Taken => unreachable!("State previously taken"),
+ State::Start => (0, sort_keys.row(0)),
+ State::InProgress {
+ current_sort,
+ sort_key,
+ ..
+ } => (*current_sort, sort_key.row()),
+ State::Complete => {
+ panic!("Saw new group after the end of input");
+ }
+ };
+
+ // copy any hash values, and find latest sort key
+ self.hashes.resize(total_num_groups, 0);
+ let iter = group_indices
+ .iter()
+ .zip(batch_hashes.iter())
+ .zip(sort_keys.iter());
+
+ for ((&group_index, &hash), group_sort_key) in iter {
+ self.hashes[group_index] = hash;
+
+ // Does this group have seen a new sort_key?
+ if sort_key != group_sort_key {
+ current_sort = group_index;
+ sort_key = group_sort_key;
+ }
+ }
+
+ self.state = State::InProgress {
+ current_sort,
+ sort_key: sort_key.owned(),
+ current: max_group_index,
+ };
+
+ Ok(())
+ }
+
+ /// Return the size of memor allocated by this structure
+ pub(crate) fn size(&self) -> usize {
+ std::mem::size_of::<Self>()
+ + self.order_indices.allocated_size()
+ + self.row_converter.size()
+ + self.hashes.allocated_size()
+ }
+}
diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
index c57f436324..b48e8f38e9 100644
--- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs
+++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
@@ -15,12 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-//! Hash aggregation through row format
-//!
-//! POC demonstration of GroupByHashApproach
+//! Hash aggregation
use datafusion_physical_expr::{
- AggregateExpr, GroupsAccumulator, GroupsAccumulatorAdapter,
+ AggregateExpr, EmitTo, GroupsAccumulator, GroupsAccumulatorAdapter,
};
use log::debug;
use std::sync::Arc;
@@ -44,7 +42,7 @@ use arrow::array::*;
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use datafusion_common::Result;
use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
-use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
+use datafusion_execution::memory_pool::{MemoryConsumer, MemoryDelta, MemoryReservation};
use datafusion_execution::TaskContext;
use hashbrown::raw::RawTable;
@@ -58,6 +56,7 @@ pub(crate) enum ExecutionState {
Done,
}
+use super::order::GroupOrdering;
use super::AggregateExec;
/// Hash based Grouping Aggregator
@@ -199,6 +198,14 @@ pub(crate) struct GroupedHashAggregateStream {
/// max rows in output RecordBatches
batch_size: usize,
+
+ /// Optional ordering information, that might allow groups to be
+ /// emitted from the hash table prior to seeing the end of the
+ /// input
+ group_ordering: GroupOrdering,
+
+ /// Have we seen the end of the input
+ input_done: bool,
}
impl GroupedHashAggregateStream {
@@ -258,6 +265,16 @@ impl GroupedHashAggregateStream {
let map = RawTable::with_capacity(0);
let group_values = row_converter.empty_rows(0, 0);
+ let group_ordering = agg
+ .aggregation_ordering
+ .as_ref()
+ .map(|aggregation_ordering| {
+ GroupOrdering::try_new(&group_schema, aggregation_ordering)
+ })
+ // return error if any
+ .transpose()?
+ .unwrap_or(GroupOrdering::None);
+
timer.done();
let exec_state = ExecutionState::ReadingInput;
@@ -279,6 +296,8 @@ impl GroupedHashAggregateStream {
baseline_metrics,
random_state: Default::default(),
batch_size,
+ group_ordering,
+ input_done: false,
})
}
}
@@ -303,6 +322,16 @@ fn create_group_accumulator(
}
}
+/// Extracts a successful Ok(_) or returns Poll::Ready(Some(Err(e))) with errors
+macro_rules! extract_ok {
+ ($RES: expr) => {{
+ match $RES {
+ Ok(v) => v,
+ Err(e) => return Poll::Ready(Some(Err(e))),
+ }
+ }};
+}
+
impl Stream for GroupedHashAggregateStream {
type Item = Result<RecordBatch>;
@@ -320,36 +349,33 @@ impl Stream for GroupedHashAggregateStream {
// new batch to aggregate
Some(Ok(batch)) => {
let timer = elapsed_compute.timer();
- let result = self.group_aggregate_batch(batch);
- timer.done();
-
- // allocate memory AFTER we actually used
- // the memory, which simplifies the whole
- // accounting and we are OK with
- // overshooting a bit.
- //
- // Also this means we either store the
- // whole record batch or not.
- let result = result.and_then(|allocated| {
- self.reservation.try_grow(allocated)
- });
-
- if let Err(e) = result {
- return Poll::Ready(Some(Err(e)));
+ // Do the grouping
+ extract_ok!(self.group_aggregate_batch(batch));
+
+ // If we can begin emitting rows, do so,
+ // otherwise keep consuming input
+ assert!(!self.input_done);
+ let to_emit = self.group_ordering.emit_to();
+
+ if let Some(to_emit) = to_emit {
+ let batch =
+ extract_ok!(self.create_batch_from_map(to_emit));
+ self.exec_state = ExecutionState::ProducingOutput(batch);
}
+ timer.done();
+ }
+ Some(Err(e)) => {
+ // inner had error, return to caller
+ return Poll::Ready(Some(Err(e)));
}
- // inner had error, return to caller
- Some(Err(e)) => return Poll::Ready(Some(Err(e))),
- // inner is done, producing output
None => {
+ // inner is done, emit all rows and switch to producing output
+ self.input_done = true;
+ self.group_ordering.input_done();
let timer = elapsed_compute.timer();
- match self.create_batch_from_map() {
- Ok(batch) => {
- self.exec_state =
- ExecutionState::ProducingOutput(batch)
- }
- Err(e) => return Poll::Ready(Some(Err(e))),
- }
+ let batch =
+ extract_ok!(self.create_batch_from_map(EmitTo::All));
+ self.exec_state = ExecutionState::ProducingOutput(batch);
timer.done();
}
}
@@ -358,7 +384,11 @@ impl Stream for GroupedHashAggregateStream {
ExecutionState::ProducingOutput(batch) => {
// slice off a part of the batch, if needed
let output_batch = if batch.num_rows() <= self.batch_size {
- self.exec_state = ExecutionState::Done;
+ if self.input_done {
+ self.exec_state = ExecutionState::Done;
+ } else {
+ self.exec_state = ExecutionState::ReadingInput
+ }
batch
} else {
// output first batch_size rows
@@ -397,7 +427,7 @@ impl GroupedHashAggregateStream {
fn update_group_state(
&mut self,
group_values: &[ArrayRef],
- allocated: &mut usize,
+ memory_delta: &mut MemoryDelta,
) -> Result<()> {
// Convert the group keys into the row format
// Avoid reallocation when https://github.com/apache/arrow-rs/issues/4479 is available
@@ -405,8 +435,7 @@ impl GroupedHashAggregateStream {
let n_rows = group_rows.num_rows();
// track memory used
- let group_values_size_pre = self.group_values.size();
- let scratch_size_pre = self.scratch_space.size();
+ memory_delta.dec(self.state_size());
// tracks to which group each of the input rows belongs
let group_indices = &mut self.scratch_space.current_group_indices;
@@ -418,6 +447,8 @@ impl GroupedHashAggregateStream {
batch_hashes.resize(n_rows, 0);
create_hashes(group_values, &self.random_state, batch_hashes)?;
+ let mut allocated = 0;
+ let starting_num_groups = self.group_values.num_rows();
for (row, &hash) in batch_hashes.iter().enumerate() {
let entry = self.map.get_mut(hash, |(_hash, group_idx)| {
// verify that a group that we are inserting with hash is
@@ -439,35 +470,40 @@ impl GroupedHashAggregateStream {
self.map.insert_accounted(
(hash, group_idx),
|(hash, _group_index)| *hash,
- allocated,
+ &mut allocated,
);
group_idx
}
};
group_indices.push(group_idx);
}
+ memory_delta.inc(allocated);
+
+ // Update ordering information if necessary
+ let total_num_groups = self.group_values.num_rows();
+ if total_num_groups > starting_num_groups {
+ self.group_ordering.new_groups(
+ group_values,
+ group_indices,
+ batch_hashes,
+ total_num_groups,
+ )?;
+ }
- // account for memory growth in scratch space
- *allocated += self.scratch_space.size();
- *allocated -= scratch_size_pre; // subtract after adding to avoid underflow
-
- // account for any memory increase used to store group_values
- *allocated += self.group_values.size();
- *allocated -= group_values_size_pre; // subtract after adding to avoid underflow
+ // account for memory change
+ memory_delta.inc(self.state_size());
Ok(())
}
/// Perform group-by aggregation for the given [`RecordBatch`].
- ///
- /// If successful, returns the additional amount of memory, in
- /// bytes, that were allocated during this process.
- fn group_aggregate_batch(&mut self, batch: RecordBatch) -> Result<usize> {
+ fn group_aggregate_batch(&mut self, batch: RecordBatch) -> Result<()> {
// Evaluate the grouping expressions
let group_by_values = evaluate_group_by(&self.group_by, &batch)?;
// Keep track of memory allocated:
- let mut allocated = 0usize;
+ let mut memory_delta = MemoryDelta::new();
+ memory_delta.dec(self.state_size());
// Evaluate the aggregation expressions.
let input_values = evaluate_many(&self.aggregate_arguments, &batch)?;
@@ -475,11 +511,9 @@ impl GroupedHashAggregateStream {
// Evaluate the filter expressions, if any, against the inputs
let filter_values = evaluate_optional(&self.filter_expressions, &batch)?;
- let row_converter_size_pre = self.row_converter.size();
-
for group_values in &group_by_values {
// calculate the group indices for each input row
- self.update_group_state(group_values, &mut allocated)?;
+ self.update_group_state(group_values, &mut memory_delta)?;
let group_indices = &self.scratch_space.current_group_indices;
// Gather the inputs to call the actual accumulator
@@ -492,7 +526,7 @@ impl GroupedHashAggregateStream {
let total_num_groups = self.group_values.num_rows();
for ((acc, values), opt_filter) in t {
- let acc_size_pre = acc.size();
+ memory_delta.dec(acc.size());
let opt_filter = opt_filter.as_ref().map(|filter| filter.as_boolean());
// Call the appropriate method on each aggregator with
@@ -519,40 +553,100 @@ impl GroupedHashAggregateStream {
)?;
}
}
-
- allocated += acc.size();
- allocated -= acc_size_pre;
+ memory_delta.inc(acc.size());
}
}
- allocated += self.row_converter.size();
- allocated -= row_converter_size_pre;
+ memory_delta.inc(self.state_size());
- Ok(allocated)
+ // Update allocation AFTER it is used, simplifying accounting,
+ // though it results in a temporary overshoot.
+ memory_delta.update(&mut self.reservation)
}
- /// Create an output RecordBatch with all group keys and accumulator states/values
- fn create_batch_from_map(&mut self) -> Result<RecordBatch> {
+ /// Create an output RecordBatch with the group keys and
+ /// accumulator states/values specified in emit_to
+ fn create_batch_from_map(&mut self, emit_to: EmitTo) -> Result<RecordBatch> {
if self.group_values.num_rows() == 0 {
- let schema = self.schema.clone();
- return Ok(RecordBatch::new_empty(schema));
+ return Ok(RecordBatch::new_empty(self.schema()));
}
+ let output = self.build_output(emit_to)?;
+ self.remove_emitted(emit_to)?;
+ let batch = RecordBatch::try_new(self.schema(), output)?;
+ Ok(batch)
+ }
+
+ /// Creates output: `(group 1, group 2, ... agg 1, agg 2, ...)`
+ fn build_output(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
// First output rows are the groups
- let groups_rows = self.group_values.iter();
- let mut output: Vec<ArrayRef> = self.row_converter.convert_rows(groups_rows)?;
+ let mut output: Vec<ArrayRef> = match emit_to {
+ EmitTo::All => {
+ let groups_rows = self.group_values.iter();
+ self.row_converter.convert_rows(groups_rows)?
+ }
+ EmitTo::First(n) => {
+ let groups_rows = self.group_values.iter().take(n);
+ self.row_converter.convert_rows(groups_rows)?
+ }
+ };
- // Next output each aggregate value, from the accumulators
+ // Next output each aggregate value
for acc in self.accumulators.iter_mut() {
match self.mode {
- AggregateMode::Partial => output.extend(acc.state()?),
+ AggregateMode::Partial => output.extend(acc.state(emit_to)?),
AggregateMode::Final
| AggregateMode::FinalPartitioned
| AggregateMode::Single
- | AggregateMode::SinglePartitioned => output.push(acc.evaluate()?),
+ | AggregateMode::SinglePartitioned => output.push(acc.evaluate(emit_to)?),
}
}
- Ok(RecordBatch::try_new(self.schema.clone(), output)?)
+ Ok(output)
+ }
+
+ /// Removes the first `n` groups, adjusting all group_indices
+ /// appropriately
+ fn remove_emitted(&mut self, emit_to: EmitTo) -> Result<()> {
+ let mut memory_delta = MemoryDelta::new();
+ memory_delta.dec(self.state_size());
+
+ match emit_to {
+ EmitTo::All => {
+ // Eventually we may also want to clear the hash table here
+ //self.map.clear();
+ }
+ EmitTo::First(n) => {
+ // Clear out first n group keys by copying them to a new Rows.
+ // TODO file some ticket in arrow-rs to make this more efficent?
+ let mut new_group_values = self.row_converter.empty_rows(0, 0);
+ for row in self.group_values.iter().skip(n) {
+ new_group_values.push(row);
+ }
+ std::mem::swap(&mut new_group_values, &mut self.group_values);
+
+ // rebuild hash table (maybe we should remove the
+ // entries for each group that was emitted rather than
+ // rebuilding the whole thing
+
+ let hashes = self.group_ordering.remove_groups(n);
+ assert_eq!(hashes.len(), self.group_values.num_rows());
+ self.map.clear();
+ for (idx, &hash) in hashes.iter().enumerate() {
+ self.map.insert(hash, (hash, idx), |(hash, _)| *hash);
+ }
+ }
+ };
+ // account for memory change
+ memory_delta.inc(self.state_size());
+ memory_delta.update(&mut self.reservation)
+ }
+
+ /// return the current size stored by variable state in this structure
+ fn state_size(&self) -> usize {
+ self.group_values.size()
+ + self.scratch_space.size()
+ + self.group_ordering.size()
+ + self.row_converter.size()
}
}
diff --git a/datafusion/core/src/physical_plan/aggregates/utils.rs b/datafusion/core/src/physical_plan/aggregates/utils.rs
deleted file mode 100644
index a55464edd1..0000000000
--- a/datafusion/core/src/physical_plan/aggregates/utils.rs
+++ /dev/null
@@ -1,150 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! This file contains various utility functions that are common to both
-//! batch and streaming aggregation code.
-
-use crate::physical_plan::aggregates::AccumulatorItem;
-use arrow::compute;
-use arrow::compute::filter;
-use arrow::row::OwnedRow;
-use arrow_array::types::UInt32Type;
-use arrow_array::{Array, ArrayRef, BooleanArray, PrimitiveArray};
-use arrow_schema::{Schema, SchemaRef};
-use datafusion_common::cast::as_boolean_array;
-use datafusion_common::utils::get_arrayref_at_indices;
-use datafusion_common::{DataFusionError, Result, ScalarValue};
-use datafusion_physical_expr::AggregateExpr;
-use datafusion_row::reader::{read_row, RowReader};
-use datafusion_row::MutableRecordBatch;
-use std::sync::Arc;
-
-/// This object encapsulates the state that is built for each output group.
-#[derive(Debug)]
-pub(crate) struct GroupState {
- /// The actual group by values, stored sequentially
- pub group_by_values: OwnedRow,
-
- // Accumulator state, stored sequentially
- pub aggregation_buffer: Vec<u8>,
-
- // Accumulator state, one for each aggregate that doesn't support row accumulation
- pub accumulator_set: Vec<AccumulatorItem>,
-
- /// Scratch space used to collect indices for input rows in a
- /// batch that have values to aggregate, reset on each batch.
- pub indices: Vec<u32>,
-}
-
-#[derive(Debug)]
-/// This object tracks the aggregation phase.
-pub(crate) enum ExecutionState {
- ReadingInput,
- ProducingOutput,
- Done,
-}
-
-pub(crate) fn aggr_state_schema(aggr_expr: &[Arc<dyn AggregateExpr>]) -> SchemaRef {
- let fields = aggr_expr
- .iter()
- .flat_map(|expr| expr.state_fields().unwrap().into_iter())
- .collect::<Vec<_>>();
- Arc::new(Schema::new(fields))
-}
-
-pub(crate) fn read_as_batch(rows: &[Vec<u8>], schema: &Schema) -> Vec<ArrayRef> {
- let mut output = MutableRecordBatch::new(rows.len(), Arc::new(schema.clone()));
- let mut row = RowReader::new(schema);
-
- for data in rows {
- row.point_to(0, data);
- read_row(&row, &mut output, schema);
- }
-
- output.output_as_columns()
-}
-
-pub(crate) fn get_at_indices(
- input_values: &[Vec<ArrayRef>],
- batch_indices: &PrimitiveArray<UInt32Type>,
-) -> Result<Vec<Vec<ArrayRef>>> {
- input_values
- .iter()
- .map(|array| get_arrayref_at_indices(array, batch_indices))
- .collect()
-}
-
-pub(crate) fn get_optional_filters(
- original_values: &[Option<Arc<dyn Array>>],
- batch_indices: &PrimitiveArray<UInt32Type>,
-) -> Vec<Option<Arc<dyn Array>>> {
- original_values
- .iter()
- .map(|array| {
- array.as_ref().map(|array| {
- compute::take(
- array.as_ref(),
- batch_indices,
- None, // None: no index check
- )
- .unwrap()
- })
- })
- .collect()
-}
-
-pub(crate) fn slice_and_maybe_filter(
- aggr_array: &[ArrayRef],
- filter_opt: Option<&Arc<dyn Array>>,
- offsets: &[usize],
-) -> Result<Vec<ArrayRef>> {
- let (offset, length) = (offsets[0], offsets[1] - offsets[0]);
- let sliced_arrays: Vec<ArrayRef> = aggr_array
- .iter()
- .map(|array| array.slice(offset, length))
- .collect();
-
- if let Some(f) = filter_opt {
- let sliced = f.slice(offset, length);
- let filter_array = as_boolean_array(&sliced)?;
-
- sliced_arrays
- .iter()
- .map(|array| filter(array, filter_array).map_err(DataFusionError::ArrowError))
- .collect()
- } else {
- Ok(sliced_arrays)
- }
-}
-
-/// This method is similar to Scalar::try_from_array except for the Null handling.
-/// This method returns [ScalarValue::Null] instead of [ScalarValue::Type(None)].
-pub(crate) fn col_to_scalar(
- array: &ArrayRef,
- filter: &Option<&BooleanArray>,
- row_index: usize,
-) -> Result<ScalarValue> {
- if array.is_null(row_index) {
- return Ok(ScalarValue::Null);
- }
- if let Some(filter) = filter {
- if !filter.value(row_index) {
- return Ok(ScalarValue::Null);
- }
- }
- ScalarValue::try_from_array(array, row_index)
-}
diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
index 74dd9ee1d1..e27c133412 100644
--- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
@@ -108,9 +108,6 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>, group_by_columns: Vec<&str
.collect::<Vec<_>>();
let group_by = PhysicalGroupBy::new_single(expr);
- println!("aggregate_expr: {aggregate_expr:?}");
- println!("group_by: {group_by:?}");
-
let aggregate_exec_running = Arc::new(
AggregateExec::try_new(
AggregateMode::Partial,
@@ -170,6 +167,8 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>, group_by_columns: Vec<&str
(i, usual_line),
(i, running_line),
"Inconsistent result\n\n\
+ Aggregate_expr: {aggregate_expr:?}\n\
+ group_by: {group_by:?}\n\
Left Plan:\n{}\n\
Right Plan:\n{}\n\
schema:\n{schema}\n\
diff --git a/datafusion/execution/src/memory_pool/mod.rs b/datafusion/execution/src/memory_pool/mod.rs
index d002cda8d8..fe077524a4 100644
--- a/datafusion/execution/src/memory_pool/mod.rs
+++ b/datafusion/execution/src/memory_pool/mod.rs
@@ -18,7 +18,7 @@
//! Manages all available memory during query execution
use datafusion_common::Result;
-use std::sync::Arc;
+use std::{cmp::Ordering, sync::Arc};
mod pool;
pub mod proxy;
@@ -157,7 +157,6 @@ impl MemoryReservation {
/// Sets the size of this reservation to `capacity`
pub fn resize(&mut self, capacity: usize) {
- use std::cmp::Ordering;
match capacity.cmp(&self.size) {
Ordering::Greater => self.grow(capacity - self.size),
Ordering::Less => self.shrink(self.size - capacity),
@@ -167,7 +166,6 @@ impl MemoryReservation {
/// Try to set the size of this reservation to `capacity`
pub fn try_resize(&mut self, capacity: usize) -> Result<()> {
- use std::cmp::Ordering;
match capacity.cmp(&self.size) {
Ordering::Greater => self.try_grow(capacity - self.size)?,
Ordering::Less => self.shrink(self.size - capacity),
@@ -221,6 +219,49 @@ pub fn human_readable_size(size: usize) -> String {
format!("{value:.1} {unit}")
}
+/// Tracks the change in memory to avoid overflow. Typically, this
+/// is isued like the following
+///
+/// 1. Call `delta.dec(sized_thing.size())`
+///
+/// 2. potentially change size of `sized_thing`
+///
+/// 3. Call `delta.inc(size_thing.size())`
+#[derive(Debug, Default)]
+pub struct MemoryDelta {
+ decrease: usize,
+ increase: usize,
+}
+
+impl MemoryDelta {
+ pub fn new() -> Self {
+ Default::default()
+ }
+
+ /// record size being 'decremented'. This is used for to record the
+ /// initial size of some allocation prior to hange
+ pub fn dec(&mut self, sz: usize) {
+ self.decrease += sz;
+ }
+
+ /// record size being 'incremented'. This is used for to record
+ /// the final size of some object.
+ pub fn inc(&mut self, sz: usize) {
+ self.increase += sz;
+ }
+
+ /// Adjusts the reservation with the delta used / freed
+ pub fn update(self, reservation: &mut MemoryReservation) -> Result<()> {
+ let Self { decrease, increase } = self;
+ match increase.cmp(&decrease) {
+ Ordering::Less => reservation.shrink(decrease - increase),
+ Ordering::Equal => {}
+ Ordering::Greater => reservation.try_grow(increase - decrease)?,
+ };
+ Ok(())
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs
index e95e9fcf87..6ad0f94e03 100644
--- a/datafusion/physical-expr/src/aggregate/average.rs
+++ b/datafusion/physical-expr/src/aggregate/average.rs
@@ -48,6 +48,7 @@ use datafusion_common::{DataFusionError, Result};
use datafusion_expr::Accumulator;
use datafusion_row::accessor::RowAccessor;
+use super::groups_accumulator::EmitTo;
use super::utils::{adjust_output_array, Decimal128Averager};
/// AVG aggregate expression
@@ -560,10 +561,10 @@ where
Ok(())
}
- fn evaluate(&mut self) -> Result<ArrayRef> {
- let counts = std::mem::take(&mut self.counts);
- let sums = std::mem::take(&mut self.sums);
- let nulls = self.null_state.build();
+ fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
+ let counts = emit_to.take_needed(&mut self.counts);
+ let sums = emit_to.take_needed(&mut self.sums);
+ let nulls = self.null_state.build(emit_to);
assert_eq!(nulls.len(), sums.len());
assert_eq!(counts.len(), sums.len());
@@ -598,12 +599,14 @@ where
}
// return arrays for sums and counts
- fn state(&mut self) -> Result<Vec<ArrayRef>> {
- let nulls = Some(self.null_state.build());
- let counts = std::mem::take(&mut self.counts);
+ fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
+ let nulls = self.null_state.build(emit_to);
+ let nulls = Some(nulls);
+
+ let counts = emit_to.take_needed(&mut self.counts);
let counts = UInt64Array::new(counts.into(), nulls.clone()); // zero copy
- let sums = std::mem::take(&mut self.sums);
+ let sums = emit_to.take_needed(&mut self.sums);
let sums = PrimitiveArray::<T>::new(sums.into(), nulls); // zero copy
let sums = adjust_output_array(&self.sum_data_type, Arc::new(sums))?;
diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs
index e0b9ffd81a..60e15a673a 100644
--- a/datafusion/physical-expr/src/aggregate/count.rs
+++ b/datafusion/physical-expr/src/aggregate/count.rs
@@ -41,6 +41,7 @@ use datafusion_row::accessor::RowAccessor;
use crate::expressions::format_state_name;
use super::groups_accumulator::accumulate::accumulate_indices;
+use super::groups_accumulator::EmitTo;
/// COUNT aggregate expression
/// Returns the amount of non-null values of the given expression.
@@ -171,8 +172,8 @@ impl GroupsAccumulator for CountGroupsAccumulator {
Ok(())
}
- fn evaluate(&mut self) -> Result<ArrayRef> {
- let counts = std::mem::take(&mut self.counts);
+ fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
+ let counts = emit_to.take_needed(&mut self.counts);
// Count is always non null (null inputs just don't contribute to the overall values)
let nulls = None;
@@ -182,8 +183,8 @@ impl GroupsAccumulator for CountGroupsAccumulator {
}
// return arrays for counts
- fn state(&mut self) -> Result<Vec<ArrayRef>> {
- let counts = std::mem::take(&mut self.counts);
+ fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
+ let counts = emit_to.take_needed(&mut self.counts);
let counts: PrimitiveArray<Int64Type> = Int64Array::from(counts); // zero copy, no nulls
Ok(vec![Arc::new(counts) as ArrayRef])
}
diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs
index bcc9d30bed..596265a737 100644
--- a/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs
+++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs
@@ -21,7 +21,9 @@
use arrow::datatypes::ArrowPrimitiveType;
use arrow_array::{Array, BooleanArray, PrimitiveArray};
-use arrow_buffer::{BooleanBufferBuilder, NullBuffer};
+use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, NullBuffer};
+
+use crate::EmitTo;
/// Track the accumulator null state per row: if any values for that
/// group were null and if any values have been seen at all for that group.
@@ -317,13 +319,30 @@ impl NullState {
}
}
- /// Creates the final [`NullBuffer`] representing which
- /// group_indices should have null values (because they never saw
- /// any values)
+ /// Creates the a [`NullBuffer`] representing which group_indices
+ /// should have null values (because they never saw any values)
+ /// for the `emit_to` rows.
///
- /// resets the internal state to empty
- pub fn build(&mut self) -> NullBuffer {
- NullBuffer::new(self.seen_values.finish())
+ /// resets the internal state appropriately
+ pub fn build(&mut self, emit_to: EmitTo) -> NullBuffer {
+ let nulls: BooleanBuffer = self.seen_values.finish();
+
+ let nulls = match emit_to {
+ EmitTo::All => nulls,
+ EmitTo::First(n) => {
+ // split off the first N values in seen_values
+ //
+ // TODO make this more efficient rather than two
+ // copies and bitwise manipulation
+ let first_n_null: BooleanBuffer = nulls.iter().take(n).collect();
+ // reset the existing seen buffer
+ for seen in nulls.iter().skip(n) {
+ self.seen_values.append(seen);
+ }
+ first_n_null
+ }
+ };
+ NullBuffer::new(nulls)
}
}
@@ -689,7 +708,7 @@ mod test {
// Validate the final buffer (one value per group)
let expected_null_buffer = mock.expected_null_buffer(total_num_groups);
- let null_buffer = null_state.build();
+ let null_buffer = null_state.build(EmitTo::All);
assert_eq!(null_buffer, expected_null_buffer);
}
@@ -806,7 +825,7 @@ mod test {
// Validate the final buffer (one value per group)
let expected_null_buffer = mock.expected_null_buffer(total_num_groups);
- let null_buffer = null_state.build();
+ let null_buffer = null_state.build(EmitTo::All);
assert_eq!(null_buffer, expected_null_buffer);
}
diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs
index 7b4c61fe7d..dcc8c37e74 100644
--- a/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs
+++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs
@@ -17,7 +17,7 @@
//! Adapter that makes [`GroupsAccumulator`] out of [`Accumulator`]
-use super::GroupsAccumulator;
+use super::{EmitTo, GroupsAccumulator};
use arrow::{
array::{AsArray, UInt32Builder},
compute,
@@ -44,8 +44,10 @@ pub struct GroupsAccumulatorAdapter {
/// Current memory usage, in bytes.
///
- /// Note this is incrementally updated to avoid size() being a
- /// bottleneck, which we saw in earlier implementations.
+ /// Note this is incrementally updated with deltas to avoid the
+ /// call to size() being a bottleneck. We saw size() being a
+ /// bottleneck in earlier implementations when there were many
+ /// distinct groups.
allocation_bytes: usize,
}
@@ -71,7 +73,7 @@ impl AccumulatorState {
fn size(&self) -> usize {
self.accumulator.size()
+ std::mem::size_of_val(self)
- + std::mem::size_of::<u32>() * self.indices.capacity()
+ + self.indices.allocated_size()
}
}
@@ -82,40 +84,29 @@ impl GroupsAccumulatorAdapter {
where
F: Fn() -> Result<Box<dyn Accumulator>> + Send + 'static,
{
- let mut new_self = Self {
+ Self {
factory: Box::new(factory),
states: vec![],
allocation_bytes: 0,
- };
- new_self.reset_allocation();
- new_self
- }
-
- // Reset the allocation bytes to empty state
- fn reset_allocation(&mut self) {
- assert!(self.states.is_empty());
- self.allocation_bytes = std::mem::size_of::<GroupsAccumulatorAdapter>();
+ }
}
/// Ensure that self.accumulators has total_num_groups
fn make_accumulators_if_needed(&mut self, total_num_groups: usize) -> Result<()> {
// can't shrink
assert!(total_num_groups >= self.states.len());
- let vec_size_pre =
- std::mem::size_of::<AccumulatorState>() * self.states.capacity();
+ let vec_size_pre = self.states.allocated_size();
// instantiate new accumulators
let new_accumulators = total_num_groups - self.states.len();
for _ in 0..new_accumulators {
let accumulator = (self.factory)()?;
let state = AccumulatorState::new(accumulator);
- self.allocation_bytes += state.size();
+ self.add_allocation(state.size());
self.states.push(state);
}
- self.allocation_bytes +=
- std::mem::size_of::<AccumulatorState>() * self.states.capacity();
- self.allocation_bytes -= vec_size_pre;
+ self.adjust_allocation(vec_size_pre, self.states.allocated_size());
Ok(())
}
@@ -204,9 +195,11 @@ impl GroupsAccumulatorAdapter {
// RecordBatch(es)
let iter = groups_with_rows.iter().zip(offsets.windows(2));
+ let mut sizes_pre = 0;
+ let mut sizes_post = 0;
for (&group_idx, offsets) in iter {
let state = &mut self.states[group_idx];
- let size_pre = state.size();
+ sizes_pre += state.size();
let values_to_accumulate =
slice_and_maybe_filter(&values, opt_filter.as_ref(), offsets)?;
@@ -215,12 +208,40 @@ impl GroupsAccumulatorAdapter {
// clear out the state so they are empty for next
// iteration
state.indices.clear();
-
- self.allocation_bytes += state.size();
- self.allocation_bytes -= size_pre;
+ sizes_post += state.size();
}
+
+ self.adjust_allocation(sizes_pre, sizes_post);
Ok(())
}
+
+ /// Increment the allocation by `n`
+ ///
+ /// See [`Self::allocation_bytes`] for rationale.
+ fn add_allocation(&mut self, size: usize) {
+ self.allocation_bytes += size;
+ }
+
+ /// Decrease the allocation by `n`
+ ///
+ /// See [`Self::allocation_bytes`] for rationale.
+ fn free_allocation(&mut self, size: usize) {
+ // use saturating sub to avoid errors if the accumulators
+ // report erronious sizes
+ self.allocation_bytes = self.allocation_bytes.saturating_sub(size)
+ }
+
+ /// Adjusts the allocation for something that started with
+ /// start_size and now has new_size avoiding overflow
+ ///
+ /// See [`Self::allocation_bytes`] for rationale.
+ fn adjust_allocation(&mut self, old_size: usize, new_size: usize) {
+ if new_size > old_size {
+ self.add_allocation(new_size - old_size)
+ } else {
+ self.free_allocation(old_size - new_size)
+ }
+ }
}
impl GroupsAccumulator for GroupsAccumulatorAdapter {
@@ -243,27 +264,36 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter {
Ok(())
}
- fn evaluate(&mut self) -> Result<ArrayRef> {
- let states = std::mem::take(&mut self.states);
+ fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
+ let vec_size_pre = self.states.allocated_size();
+
+ let states = emit_to.take_needed(&mut self.states);
let results: Vec<ScalarValue> = states
.into_iter()
- .map(|state| state.accumulator.evaluate())
+ .map(|state| {
+ self.free_allocation(state.size());
+ state.accumulator.evaluate()
+ })
.collect::<Result<_>>()?;
let result = ScalarValue::iter_to_array(results);
- self.reset_allocation();
+
+ self.adjust_allocation(vec_size_pre, self.states.allocated_size());
+
result
}
- fn state(&mut self) -> Result<Vec<ArrayRef>> {
- let states = std::mem::take(&mut self.states);
+ fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
+ let vec_size_pre = self.states.allocated_size();
+ let states = emit_to.take_needed(&mut self.states);
// each accumulator produces a potential vector of values
// which we need to form into columns
let mut results: Vec<Vec<ScalarValue>> = vec![];
for state in states {
+ self.free_allocation(state.size());
let accumulator_state = state.accumulator.state()?;
results.resize_with(accumulator_state.len(), Vec::new);
for (idx, state_val) in accumulator_state.into_iter().enumerate() {
@@ -284,8 +314,8 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter {
assert_eq!(arr.len(), first_col.len())
}
}
+ self.adjust_allocation(vec_size_pre, self.states.allocated_size());
- self.reset_allocation();
Ok(arrays)
}
@@ -302,7 +332,8 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter {
opt_filter,
total_num_groups,
|accumulator, values_to_accumulate| {
- accumulator.merge_batch(values_to_accumulate)
+ accumulator.merge_batch(values_to_accumulate)?;
+ Ok(())
},
)?;
Ok(())
@@ -313,6 +344,23 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter {
}
}
+/// Extension trait for [`Vec`] to account for allocations.
+pub trait VecAllocExt {
+ /// Item type.
+ type T;
+ /// Return the amount of memory allocated by this Vec (not
+ /// recursively counting any heap allocations contained within the
+ /// structure). Does not include the size of `self`
+ fn allocated_size(&self) -> usize;
+}
+
+impl<T> VecAllocExt for Vec<T> {
+ type T = T;
+ fn allocated_size(&self) -> usize {
+ std::mem::size_of::<T>() * self.capacity()
+ }
+}
+
fn get_filter_at_indices(
opt_filter: Option<&BooleanArray>,
indices: &PrimitiveArray<UInt32Type>,
diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs
index 83ffc3717b..21b6cc29e8 100644
--- a/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs
+++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs
@@ -19,12 +19,12 @@ use std::sync::Arc;
use arrow::array::AsArray;
use arrow_array::{ArrayRef, BooleanArray};
-use arrow_buffer::BooleanBufferBuilder;
+use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder};
use datafusion_common::Result;
use crate::GroupsAccumulator;
-use super::accumulate::NullState;
+use super::{accumulate::NullState, EmitTo};
/// An accumulator that implements a single operation over a
/// [`BooleanArray`] where the accumulated state is also boolean (such
@@ -98,15 +98,28 @@ where
Ok(())
}
- fn evaluate(&mut self) -> Result<ArrayRef> {
+ fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
let values = self.values.finish();
- let nulls = self.null_state.build();
+
+ let values = match emit_to {
+ EmitTo::All => values,
+ EmitTo::First(n) => {
+ let first_n: BooleanBuffer = values.iter().take(n).collect();
+ // put n+1 back into self.values
+ for v in values.iter().skip(n) {
+ self.values.append(v);
+ }
+ first_n
+ }
+ };
+
+ let nulls = self.null_state.build(emit_to);
let values = BooleanArray::new(values, Some(nulls));
Ok(Arc::new(values))
}
- fn state(&mut self) -> Result<Vec<ArrayRef>> {
- self.evaluate().map(|arr| vec![arr])
+ fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
+ self.evaluate(emit_to).map(|arr| vec![arr])
}
fn merge_batch(
diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs
index 49d62e7a93..d2e64d373b 100644
--- a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs
+++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs
@@ -27,6 +27,42 @@ pub(crate) mod prim_op;
use arrow_array::{ArrayRef, BooleanArray};
use datafusion_common::Result;
+/// Describes how many rows should be emitted during grouping.
+#[derive(Debug, Clone, Copy)]
+pub enum EmitTo {
+ /// Emit all groups
+ All,
+ /// Emit only the first `n` groups and shift all existing group
+ /// indexes down by `n`.
+ ///
+ /// For example, if `n=10`, group_index `0, 1, ... 9` are emitted
+ /// and group indexes '`10, 11, 12, ...` become `0, 1, 2, ...`.
+ First(usize),
+}
+
+impl EmitTo {
+ /// Removes the number of rows from `v` required to emit the right
+ /// number of rows, returning a `Vec` with elements taken, and the
+ /// remaining values in `v`.
+ ///
+ /// This avoids copying if Self::All
+ pub fn take_needed<T>(&self, v: &mut Vec<T>) -> Vec<T> {
+ match self {
+ Self::All => {
+ // Take the entire vector, leave new (empty) vector
+ std::mem::take(v)
+ }
+ Self::First(n) => {
+ // get end n+1,.. values into t
+ let mut t = v.split_off(*n);
+ // leave n+1,.. in v
+ std::mem::swap(v, &mut t);
+ t
+ }
+ }
+ }
+}
+
/// `GroupAccumulator` implements a single aggregate (e.g. AVG) and
/// stores the state for *all* groups internally.
///
@@ -72,28 +108,30 @@ pub trait GroupsAccumulator: Send {
/// each group, and `evaluate` will produce that running sum as
/// its output for all groups, in group_index order
///
- /// The accumulator should free to release / reset it is internal
- /// state after this call to the same as it was after being
- /// initially created.
- fn evaluate(&mut self) -> Result<ArrayRef>;
+ /// If `emit_to`` is [`EmitTo::All`], the accumulator should
+ /// return all groups and release / reset its internal state
+ /// equivalent to when it was first created.
+ ///
+ /// If `emit_to` is [`EmitTo::First`], only the first `n` groups
+ /// should be emitted and the state for those first groups
+ /// removed. State for the remaining groups must be retained for
+ /// future use. The group_indices on subsequent calls to
+ /// `update_batch` or `merge_batch` will be shifted down by
+ /// `n`. See [`EmitTo::First`] for more details.
+ fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef>;
/// Returns the intermediate aggregate state for this accumulator,
/// used for multi-phase grouping, resetting its internal state.
///
- /// The rows returned *must* be in group_index order: The value
- /// for group_index 0, followed by 1, etc. Any group_index that
- /// did not have values, should be null.
- ///
/// For example, `AVG` might return two arrays: `SUM` and `COUNT`
/// but the `MIN` aggregate would just return a single array.
///
/// Note more sophisticated internal state can be passed as
/// single `StructArray` rather than multiple arrays.
///
- /// The accumulator should free to release / reset its internal
- /// state after this call to the same as it was after being
- /// initially created.
- fn state(&mut self) -> Result<Vec<ArrayRef>>;
+ /// See [`Self::evaluate`] for details on the required output
+ /// order and `emit_to`.
+ fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;
/// Merges intermediate state (the output from [`Self::state`])
/// into this accumulator's values.
diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs
index a49651a5e3..adeaea712c 100644
--- a/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs
+++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs
@@ -24,7 +24,7 @@ use datafusion_common::Result;
use crate::{aggregate::utils::adjust_output_array, GroupsAccumulator};
-use super::accumulate::NullState;
+use super::{accumulate::NullState, EmitTo};
/// An accumulator that implements a single operation over
/// [`ArrowPrimitiveType`] where the accumulated state is the same as
@@ -112,16 +112,16 @@ where
Ok(())
}
- fn evaluate(&mut self) -> Result<ArrayRef> {
- let values = std::mem::take(&mut self.values);
- let nulls = self.null_state.build();
+ fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
+ let values = emit_to.take_needed(&mut self.values);
+ let nulls = self.null_state.build(emit_to);
let values = PrimitiveArray::<T>::new(values.into(), Some(nulls)); // no copy
adjust_output_array(&self.data_type, Arc::new(values))
}
- fn state(&mut self) -> Result<Vec<ArrayRef>> {
- self.evaluate().map(|arr| vec![arr])
+ fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
+ self.evaluate(emit_to).map(|arr| vec![arr])
}
fn merge_batch(
diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs
index 165faba442..faa805dc92 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -46,8 +46,9 @@ pub mod utils;
pub mod var_provider;
pub mod window;
-// reexport this to maintain compatibility with anything that used from_slice previously
-pub use aggregate::groups_accumulator::{GroupsAccumulator, GroupsAccumulatorAdapter};
+pub use aggregate::groups_accumulator::{
+ EmitTo, GroupsAccumulator, GroupsAccumulatorAdapter,
+};
pub use aggregate::AggregateExpr;
pub use equivalence::{