You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ak...@apache.org on 2023/04/27 14:13:10 UTC
[arrow-datafusion] branch main updated: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered (V2) (#6124)
This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new aec34205e9 Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered (V2) (#6124)
aec34205e9 is described below
commit aec34205e94728b6975551933e99c1d874806a13
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Thu Apr 27 17:13:01 2023 +0300
Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered (V2) (#6124)
* add starting code for experimenting
* stream group by linear implementation
* sorted implementation
* minor changes
* simplifications
* Simplifications
* convert vec to Option
* minor changes
* minor changes
* minor changes
* simplifications
* minor changes
* all tests pass
* refactor
* simplifications
* remove unnecessary code
* simplifications
* minor changes
* simplifications
* minor changes
* Simplify the GroupByOrderMode type
* Address reviews
* separate fully ordered case and remaining cases
* change test data type
* address reviews
* Convert to option
* retract back to old API.
* Code quality: stylistic changes
* Separate bounded stream and hash stream
* Update comments
---------
Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
.../{row_hash.rs => bounded_aggregate_stream.rs} | 518 ++++++++++++++-------
.../core/src/physical_plan/aggregates/mod.rs | 291 ++++++++++--
.../core/src/physical_plan/aggregates/row_hash.rs | 140 +-----
.../core/src/physical_plan/aggregates/utils.rs | 151 ++++++
datafusion/core/src/physical_plan/analyze.rs | 2 +-
.../core/src/physical_plan/coalesce_batches.rs | 2 +-
.../core/src/physical_plan/coalesce_partitions.rs | 2 +-
datafusion/core/src/physical_plan/filter.rs | 2 +-
.../core/src/physical_plan/joins/cross_join.rs | 2 +-
.../core/src/physical_plan/joins/hash_join.rs | 2 +-
.../src/physical_plan/joins/symmetric_hash_join.rs | 5 +-
datafusion/core/src/physical_plan/mod.rs | 2 +-
datafusion/core/src/physical_plan/projection.rs | 2 +-
.../core/src/physical_plan/repartition/mod.rs | 2 +-
datafusion/core/src/physical_plan/sorts/sort.rs | 2 +-
datafusion/core/src/physical_plan/union.rs | 4 +-
datafusion/core/src/physical_plan/unnest.rs | 2 +-
.../windows/bounded_window_agg_exec.rs | 3 +-
.../src/physical_plan/windows/window_agg_exec.rs | 2 +-
datafusion/core/tests/aggregate_fuzz.rs | 222 +++++++++
datafusion/core/tests/sql/group_by.rs | 97 ++++
.../core/tests/sqllogictests/test_files/window.slt | 33 +-
datafusion/core/tests/window_fuzz.rs | 4 +
23 files changed, 1118 insertions(+), 374 deletions(-)
diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
similarity index 66%
copy from datafusion/core/src/physical_plan/aggregates/row_hash.rs
copy to datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
index bf1846ae98..127821de45 100644
--- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs
+++ b/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
@@ -15,7 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-//! Hash aggregation through row format
+//! Hash aggregation on ordered group by expressions
+//! Output generated will itself have an ordering
+//! and executor can run with bounded memory (can generate result in streaming cases)
use std::cmp::min;
use std::ops::Range;
@@ -24,35 +26,38 @@ use std::task::{Context, Poll};
use std::vec;
use ahash::RandomState;
-use arrow::row::{OwnedRow, RowConverter, SortField};
-use datafusion_physical_expr::hash_utils::create_hashes;
use futures::ready;
use futures::stream::{Stream, StreamExt};
+use hashbrown::raw::RawTable;
+use itertools::izip;
use crate::execution::context::TaskContext;
use crate::execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
use crate::physical_plan::aggregates::{
- evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AccumulatorItem,
- AggregateMode, PhysicalGroupBy, RowAccumulatorItem,
+ evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AggregateMode,
+ AggregationOrdering, GroupByOrderMode, PhysicalGroupBy, RowAccumulatorItem,
};
use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
-use arrow::array::*;
-use arrow::compute::{cast, filter};
-use arrow::datatypes::{DataType, Schema, UInt32Type};
-use arrow::{compute, datatypes::SchemaRef, record_batch::RecordBatch};
+
+use crate::physical_plan::aggregates::utils::{
+ aggr_state_schema, col_to_scalar, get_at_indices, get_optional_filters,
+ read_as_batch, slice_and_maybe_filter, ExecutionState, GroupState,
+};
+use arrow::array::{new_null_array, ArrayRef, UInt32Builder};
+use arrow::compute::{cast, SortColumn};
+use arrow::datatypes::DataType;
+use arrow::row::{OwnedRow, RowConverter, SortField};
+use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use datafusion_common::cast::as_boolean_array;
-use datafusion_common::utils::get_arrayref_at_indices;
+use datafusion_common::utils::{evaluate_partition_ranges, get_row_at_idx};
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::Accumulator;
+use datafusion_physical_expr::hash_utils::create_hashes;
use datafusion_row::accessor::RowAccessor;
use datafusion_row::layout::RowLayout;
-use datafusion_row::reader::{read_row, RowReader};
-use datafusion_row::MutableRecordBatch;
-use hashbrown::raw::RawTable;
-use itertools::izip;
/// Grouping aggregate with row-format aggregation states inside.
///
@@ -70,7 +75,7 @@ use itertools::izip;
///
/// [Arrow-row]: OwnedRow
/// [WordAligned]: datafusion_row::layout
-pub(crate) struct GroupedHashAggregateStream {
+pub(crate) struct BoundedAggregateStream {
schema: SchemaRef,
input: SendableRecordBatchStream,
mode: AggregateMode,
@@ -105,26 +110,12 @@ pub(crate) struct GroupedHashAggregateStream {
/// first element in the array corresponds to normal accumulators
/// second element in the array corresponds to row accumulators
indices: [Vec<Range<usize>>; 2],
+ aggregation_ordering: AggregationOrdering,
+ is_end: bool,
}
-#[derive(Debug)]
-/// tracks what phase the aggregation is in
-enum ExecutionState {
- ReadingInput,
- ProducingOutput,
- Done,
-}
-
-fn aggr_state_schema(aggr_expr: &[Arc<dyn AggregateExpr>]) -> Result<SchemaRef> {
- let fields = aggr_expr
- .iter()
- .flat_map(|expr| expr.state_fields().unwrap().into_iter())
- .collect::<Vec<_>>();
- Ok(Arc::new(Schema::new(fields)))
-}
-
-impl GroupedHashAggregateStream {
- /// Create a new GroupedHashAggregateStream
+impl BoundedAggregateStream {
+ /// Create a new BoundedAggregateStream
#[allow(clippy::too_many_arguments)]
pub fn new(
mode: AggregateMode,
@@ -137,6 +128,8 @@ impl GroupedHashAggregateStream {
batch_size: usize,
context: Arc<TaskContext>,
partition: usize,
+ // Stores algorithm mode and output ordering
+ aggregation_ordering: AggregationOrdering,
) -> Result<Self> {
let timer = baseline_metrics.elapsed_compute().timer();
@@ -205,18 +198,18 @@ impl GroupedHashAggregateStream {
let row_aggr_layout = Arc::new(RowLayout::new(&row_aggr_schema));
- let name = format!("GroupedHashAggregateStream[{partition}]");
+ let name = format!("BoundedAggregateStream[{partition}]");
let aggr_state = AggregationState {
reservation: MemoryConsumer::new(name).register(context.memory_pool()),
map: RawTable::with_capacity(0),
- group_states: Vec::with_capacity(0),
+ ordered_group_states: Vec::with_capacity(0),
};
timer.done();
let exec_state = ExecutionState::ReadingInput;
- Ok(GroupedHashAggregateStream {
+ Ok(BoundedAggregateStream {
schema: Arc::clone(&schema),
input,
mode,
@@ -237,11 +230,13 @@ impl GroupedHashAggregateStream {
batch_size,
row_group_skip_position: 0,
indices: [normal_agg_indices, row_agg_indices],
+ is_end: false,
+ aggregation_ordering,
})
}
}
-impl Stream for GroupedHashAggregateStream {
+impl Stream for BoundedAggregateStream {
type Item = Result<RecordBatch>;
fn poll_next(
@@ -275,6 +270,10 @@ impl Stream for GroupedHashAggregateStream {
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
// inner is done, producing output
None => {
+ for element in self.aggr_state.ordered_group_states.iter_mut()
+ {
+ element.status = GroupStatus::CanEmit;
+ }
self.exec_state = ExecutionState::ProducingOutput;
}
}
@@ -285,12 +284,14 @@ impl Stream for GroupedHashAggregateStream {
let result = self.create_batch_from_map();
timer.done();
- self.row_group_skip_position += self.batch_size;
match result {
// made output
Ok(Some(result)) => {
let batch = result.record_output(&self.baseline_metrics);
+ self.row_group_skip_position += batch.num_rows();
+ self.exec_state = ExecutionState::ReadingInput;
+ self.prune();
return Poll::Ready(Some(Ok(batch)));
}
// end of output
@@ -307,21 +308,141 @@ impl Stream for GroupedHashAggregateStream {
}
}
-impl RecordBatchStream for GroupedHashAggregateStream {
+impl RecordBatchStream for BoundedAggregateStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
-impl GroupedHashAggregateStream {
- // Update the row_aggr_state according to groub_by values (result of group_by_expressions)
+/// This utility object encapsulates the row object, the hash and the group
+/// indices for a group. This information is used when executing streaming
+/// GROUP BY calculations.
+struct GroupOrderInfo {
+ owned_row: OwnedRow,
+ hash: u64,
+ range: Range<usize>,
+}
+
+impl BoundedAggregateStream {
+ // Update the aggr_state according to group_by values (result of group_by_expressions) when group by
+ // expressions are fully ordered.
+ fn update_ordered_group_state(
+ &mut self,
+ group_values: &[ArrayRef],
+ per_group_indices: Vec<GroupOrderInfo>,
+ allocated: &mut usize,
+ ) -> Result<Vec<usize>> {
+ // 1.1 construct the key from the group values
+ // 1.2 construct the mapping key if it does not exist
+ // 1.3 add the row' index to `indices`
+
+ // track which entries in `aggr_state` have rows in this batch to aggregate
+ let mut groups_with_rows = vec![];
+
+ let AggregationState {
+ map: row_map,
+ ordered_group_states: row_group_states,
+ ..
+ } = &mut self.aggr_state;
+
+ for GroupOrderInfo {
+ owned_row,
+ hash,
+ range,
+ } in per_group_indices
+ {
+ let entry = row_map.get_mut(hash, |(_hash, group_idx)| {
+ // verify that a group that we are inserting with hash is
+ // actually the same key value as the group in
+ // existing_idx (aka group_values @ row)
+ let ordered_group_state = &row_group_states[*group_idx];
+ let group_state = &ordered_group_state.group_state;
+ owned_row.row() == group_state.group_by_values.row()
+ });
+
+ match entry {
+ // Existing entry for this group value
+ Some((_hash, group_idx)) => {
+ let group_state = &mut row_group_states[*group_idx].group_state;
+
+ // 1.3
+ if group_state.indices.is_empty() {
+ groups_with_rows.push(*group_idx);
+ };
+ for row in range.start..range.end {
+ // remember this row
+ group_state.indices.push_accounted(row as u32, allocated);
+ }
+ }
+ // 1.2 Need to create new entry
+ None => {
+ let accumulator_set =
+ aggregates::create_accumulators(&self.normal_aggr_expr)?;
+ let row = get_row_at_idx(group_values, range.start)?;
+ let ordered_columns = self
+ .aggregation_ordering
+ .order_indices
+ .iter()
+ .map(|idx| row[*idx].clone())
+ .collect::<Vec<_>>();
+ // Add new entry to group_states and save newly created index
+ let group_state = GroupState {
+ group_by_values: owned_row,
+ aggregation_buffer: vec![
+ 0;
+ self.row_aggr_layout.fixed_part_width()
+ ],
+ accumulator_set,
+ indices: (range.start as u32..range.end as u32)
+ .collect::<Vec<_>>(), // 1.3
+ };
+ let group_idx = row_group_states.len();
+
+ // NOTE: do NOT include the `RowGroupState` struct size in here because this is captured by
+ // `group_states` (see allocation down below)
+ *allocated += (std::mem::size_of::<u8>()
+ * group_state.group_by_values.as_ref().len())
+ + (std::mem::size_of::<u8>()
+ * group_state.aggregation_buffer.capacity())
+ + (std::mem::size_of::<u32>() * group_state.indices.capacity());
+
+ // Allocation done by normal accumulators
+ *allocated += (std::mem::size_of::<Box<dyn Accumulator>>()
+ * group_state.accumulator_set.capacity())
+ + group_state
+ .accumulator_set
+ .iter()
+ .map(|accu| accu.size())
+ .sum::<usize>();
+
+ // for hasher function, use precomputed hash value
+ row_map.insert_accounted(
+ (hash, group_idx),
+ |(hash, _group_index)| *hash,
+ allocated,
+ );
+
+ let ordered_group_state = OrderedGroupState {
+ group_state,
+ ordered_columns,
+ status: GroupStatus::GroupProgress,
+ hash,
+ };
+ row_group_states.push_accounted(ordered_group_state, allocated);
+
+ groups_with_rows.push(group_idx);
+ }
+ };
+ }
+ Ok(groups_with_rows)
+ }
+
+ // Update the aggr_state according to group_by values (result of group_by_expressions)
fn update_group_state(
&mut self,
group_values: &[ArrayRef],
allocated: &mut usize,
) -> Result<Vec<usize>> {
- let group_rows = self.row_converter.convert_columns(group_values)?;
- let n_rows = group_rows.num_rows();
// 1.1 construct the key from the group values
// 1.2 construct the mapping key if it does not exist
// 1.3 add the row' index to `indices`
@@ -329,12 +450,16 @@ impl GroupedHashAggregateStream {
// track which entries in `aggr_state` have rows in this batch to aggregate
let mut groups_with_rows = vec![];
+ let group_rows = self.row_converter.convert_columns(group_values)?;
+ let n_rows = group_rows.num_rows();
// 1.1 Calculate the group keys for the group values
let mut batch_hashes = vec![0; n_rows];
create_hashes(group_values, &self.random_state, &mut batch_hashes)?;
let AggregationState {
- map, group_states, ..
+ map,
+ ordered_group_states: group_states,
+ ..
} = &mut self.aggr_state;
for (row, hash) in batch_hashes.into_iter().enumerate() {
@@ -342,14 +467,14 @@ impl GroupedHashAggregateStream {
// verify that a group that we are inserting with hash is
// actually the same key value as the group in
// existing_idx (aka group_values @ row)
- let group_state = &group_states[*group_idx];
+ let group_state = &group_states[*group_idx].group_state;
group_rows.row(row) == group_state.group_by_values.row()
});
match entry {
// Existing entry for this group value
Some((_hash, group_idx)) => {
- let group_state = &mut group_states[*group_idx];
+ let group_state = &mut group_states[*group_idx].group_state;
// 1.3
if group_state.indices.is_empty() {
@@ -362,7 +487,13 @@ impl GroupedHashAggregateStream {
None => {
let accumulator_set =
aggregates::create_accumulators(&self.normal_aggr_expr)?;
- // Add new entry to group_states and save newly created index
+ let row_values = get_row_at_idx(group_values, row)?;
+ let ordered_columns = self
+ .aggregation_ordering
+ .order_indices
+ .iter()
+ .map(|idx| row_values[*idx].clone())
+ .collect::<Vec<_>>();
let group_state = GroupState {
group_by_values: group_rows.row(row).owned(),
aggregation_buffer: vec![
@@ -398,7 +529,14 @@ impl GroupedHashAggregateStream {
allocated,
);
- group_states.push_accounted(group_state, allocated);
+ // Add new entry to group_states and save newly created index
+ let ordered_group_state = OrderedGroupState {
+ group_state,
+ ordered_columns,
+ status: GroupStatus::GroupProgress,
+ hash,
+ };
+ group_states.push_accounted(ordered_group_state, allocated);
groups_with_rows.push(group_idx);
}
@@ -407,7 +545,7 @@ impl GroupedHashAggregateStream {
Ok(groups_with_rows)
}
- // Update the accumulator results, according to row_aggr_state.
+ // Update the accumulator results, according to aggr_state.
#[allow(clippy::too_many_arguments)]
fn update_accumulators_using_batch(
&mut self,
@@ -428,7 +566,8 @@ impl GroupedHashAggregateStream {
.iter()
.zip(offsets.windows(2))
.try_for_each(|(group_idx, offsets)| {
- let group_state = &mut self.aggr_state.group_states[*group_idx];
+ let group_state =
+ &mut self.aggr_state.ordered_group_states[*group_idx].group_state;
// 2.2
// Process row accumulators
self.row_accumulators
@@ -490,7 +629,7 @@ impl GroupedHashAggregateStream {
Ok(())
}
- // Update the accumulator results, according to row_aggr_state.
+ // Update the accumulator results, according to aggr_state.
fn update_accumulators_using_scalar(
&mut self,
groups_with_rows: &[usize],
@@ -506,7 +645,8 @@ impl GroupedHashAggregateStream {
.collect::<Result<Vec<_>>>()?;
for group_idx in groups_with_rows {
- let group_state = &mut self.aggr_state.group_states[*group_idx];
+ let group_state =
+ &mut self.aggr_state.ordered_group_states[*group_idx].group_state;
let mut state_accessor =
RowAccessor::new_from_layout(self.row_aggr_layout.clone());
state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice());
@@ -562,8 +702,48 @@ impl GroupedHashAggregateStream {
let row_converter_size_pre = self.row_converter.size();
for group_values in &group_by_values {
- let groups_with_rows =
- self.update_group_state(group_values, &mut allocated)?;
+ let groups_with_rows = if let AggregationOrdering {
+ mode: GroupByOrderMode::Ordered,
+ order_indices,
+ ordering,
+ } = &self.aggregation_ordering
+ {
+ let group_rows = self.row_converter.convert_columns(group_values)?;
+ let n_rows = group_rows.num_rows();
+ // 1.1 Calculate the group keys for the group values
+ let mut batch_hashes = vec![0; n_rows];
+ create_hashes(group_values, &self.random_state, &mut batch_hashes)?;
+ let sort_column = order_indices
+ .iter()
+ .enumerate()
+ .map(|(idx, cur_idx)| SortColumn {
+ values: group_values[*cur_idx].clone(),
+ options: Some(ordering[idx].options),
+ })
+ .collect::<Vec<_>>();
+ let n_rows = group_rows.num_rows();
+ let ranges = evaluate_partition_ranges(n_rows, &sort_column)?;
+ let per_group_indices = ranges
+ .into_iter()
+ .map(|range| {
+ let row = group_rows.row(range.start).owned();
+ // (row, batch_hashes[range.start], range)
+ GroupOrderInfo {
+ owned_row: row,
+ hash: batch_hashes[range.start],
+ range,
+ }
+ })
+ .collect::<Vec<_>>();
+ self.update_ordered_group_state(
+ group_values,
+ per_group_indices,
+ &mut allocated,
+ )?
+ } else {
+ self.update_group_state(group_values, &mut allocated)?
+ };
+
// Decide the accumulators update mode, use scalar value to update the accumulators when all of the conditions are meet:
// 1) The aggregation mode is Partial or Single
// 2) There is not normal aggregation expressions
@@ -580,58 +760,97 @@ impl GroupedHashAggregateStream {
)?;
} else {
// Collect all indices + offsets based on keys in this vec
- let mut batch_indices: UInt32Builder = UInt32Builder::with_capacity(0);
+ let mut batch_indices = UInt32Builder::with_capacity(0);
let mut offsets = vec![0];
let mut offset_so_far = 0;
for &group_idx in groups_with_rows.iter() {
- let indices = &self.aggr_state.group_states[group_idx].indices;
+ let indices = &self.aggr_state.ordered_group_states[group_idx]
+ .group_state
+ .indices;
batch_indices.append_slice(indices);
offset_so_far += indices.len();
offsets.push(offset_so_far);
}
let batch_indices = batch_indices.finish();
- let row_values = get_at_indices(&row_aggr_input_values, &batch_indices)?;
- let normal_values =
- get_at_indices(&normal_aggr_input_values, &batch_indices)?;
let row_filter_values =
get_optional_filters(&row_filter_values, &batch_indices);
let normal_filter_values =
get_optional_filters(&normal_filter_values, &batch_indices);
- self.update_accumulators_using_batch(
- &groups_with_rows,
- &offsets,
- &row_values,
- &normal_values,
- &row_filter_values,
- &normal_filter_values,
- &mut allocated,
- )?;
+ if self.aggregation_ordering.mode == GroupByOrderMode::Ordered {
+ self.update_accumulators_using_batch(
+ &groups_with_rows,
+ &offsets,
+ &row_aggr_input_values,
+ &normal_aggr_input_values,
+ &row_filter_values,
+ &normal_filter_values,
+ &mut allocated,
+ )?;
+ } else {
+ let row_values =
+ get_at_indices(&row_aggr_input_values, &batch_indices)?;
+ let normal_values =
+ get_at_indices(&normal_aggr_input_values, &batch_indices)?;
+ self.update_accumulators_using_batch(
+ &groups_with_rows,
+ &offsets,
+ &row_values,
+ &normal_values,
+ &row_filter_values,
+ &normal_filter_values,
+ &mut allocated,
+ )?;
+ };
}
}
allocated += self
.row_converter
.size()
.saturating_sub(row_converter_size_pre);
+
+ let mut new_result = false;
+ let last_ordered_columns = self
+ .aggr_state
+ .ordered_group_states
+ .last()
+ .map(|item| item.ordered_columns.clone());
+
+ if let Some(last_ordered_columns) = last_ordered_columns {
+ for cur_group in &mut self.aggr_state.ordered_group_states {
+ if cur_group.ordered_columns != last_ordered_columns {
+ // We will no longer receive value. Set status to GroupStatus::CanEmit
+ // meaning we can generate result for this group.
+ cur_group.status = GroupStatus::CanEmit;
+ new_result = true;
+ }
+ }
+ }
+ if new_result {
+ self.exec_state = ExecutionState::ProducingOutput;
+ }
+
Ok(allocated)
}
}
+#[derive(Debug, PartialEq)]
+enum GroupStatus {
+ // `GroupProgress` means data for current group is not complete. New data may arrive.
+ GroupProgress,
+ // `CanEmit` means data for current group is completed. And its result can emitted.
+ CanEmit,
+ // Emitted means that result for the groups is outputted. Group can be pruned from state.
+ Emitted,
+}
+
/// The state that is built for each output group.
#[derive(Debug)]
-pub struct GroupState {
- /// The actual group by values, stored sequentially
- group_by_values: OwnedRow,
-
- // Accumulator state, stored sequentially
- pub aggregation_buffer: Vec<u8>,
-
- // Accumulator state, one for each aggregate that doesn't support row accumulation
- pub accumulator_set: Vec<AccumulatorItem>,
-
- /// scratch space used to collect indices for input rows in a
- /// bach that have values to aggregate. Reset on each batch
- pub indices: Vec<u32>,
+pub struct OrderedGroupState {
+ group_state: GroupState,
+ ordered_columns: Vec<ScalarValue>,
+ status: GroupStatus,
+ hash: u64,
}
/// The state of all the groups
@@ -648,7 +867,7 @@ pub struct AggregationState {
pub map: RawTable<(u64, usize)>,
/// State for each group
- pub group_states: Vec<GroupState>,
+ pub ordered_group_states: Vec<OrderedGroupState>,
}
impl std::fmt::Debug for AggregationState {
@@ -657,28 +876,54 @@ impl std::fmt::Debug for AggregationState {
let map_string = "RawTable";
f.debug_struct("AggregationState")
.field("map", &map_string)
- .field("group_states", &self.group_states)
+ .field("ordered_group_states", &self.ordered_group_states)
.finish()
}
}
-impl GroupedHashAggregateStream {
+impl BoundedAggregateStream {
+ /// Prune the groups from the `self.aggr_state.group_states` which are in
+ /// `GroupStatus::Emitted`(this status means that result of this group emitted/outputted already, and
+ /// we are sure that these groups cannot receive new rows.) status.
+ fn prune(&mut self) {
+ let n_partition = self.aggr_state.ordered_group_states.len();
+ self.aggr_state
+ .ordered_group_states
+ .retain(|elem| elem.status != GroupStatus::Emitted);
+ let n_partition_new = self.aggr_state.ordered_group_states.len();
+ let n_pruned = n_partition - n_partition_new;
+ self.aggr_state.map.clear();
+ for (idx, item) in self.aggr_state.ordered_group_states.iter().enumerate() {
+ self.aggr_state
+ .map
+ .insert(item.hash, (item.hash, idx), |(hash, _)| *hash);
+ }
+ self.row_group_skip_position -= n_pruned;
+ }
+
/// Create a RecordBatch with all group keys and accumulator' states or values.
fn create_batch_from_map(&mut self) -> Result<Option<RecordBatch>> {
let skip_items = self.row_group_skip_position;
- if skip_items > self.aggr_state.group_states.len() {
+ if skip_items > self.aggr_state.ordered_group_states.len() || self.is_end {
return Ok(None);
}
- if self.aggr_state.group_states.is_empty() {
+ self.is_end |= skip_items == self.aggr_state.ordered_group_states.len();
+ if self.aggr_state.ordered_group_states.is_empty() {
let schema = self.schema.clone();
return Ok(Some(RecordBatch::new_empty(schema)));
}
let end_idx = min(
skip_items + self.batch_size,
- self.aggr_state.group_states.len(),
+ self.aggr_state.ordered_group_states.len(),
);
- let group_state_chunk = &self.aggr_state.group_states[skip_items..end_idx];
+ let group_state_chunk =
+ &self.aggr_state.ordered_group_states[skip_items..end_idx];
+ // Consider only the groups that can be emitted. (The ones we are sure that will not receive new entry.)
+ let group_state_chunk = group_state_chunk
+ .iter()
+ .filter(|item| item.status == GroupStatus::CanEmit)
+ .collect::<Vec<_>>();
if group_state_chunk.is_empty() {
let schema = self.schema.clone();
@@ -688,7 +933,7 @@ impl GroupedHashAggregateStream {
// Buffers for each distinct group (i.e. row accumulator memories)
let mut state_buffers = group_state_chunk
.iter()
- .map(|gs| gs.aggregation_buffer.clone())
+ .map(|gs| gs.group_state.aggregation_buffer.clone())
.collect::<Vec<_>>();
let output_fields = self.schema.fields();
@@ -734,7 +979,7 @@ impl GroupedHashAggregateStream {
let current = match self.mode {
AggregateMode::Partial => ScalarValue::iter_to_array(
group_state_chunk.iter().map(|group_state| {
- group_state.accumulator_set[idx]
+ group_state.group_state.accumulator_set[idx]
.state()
.map(|v| v[field_idx].clone())
.expect("Unexpected accumulator state in hash aggregate")
@@ -744,7 +989,7 @@ impl GroupedHashAggregateStream {
| AggregateMode::FinalPartitioned
| AggregateMode::Single => ScalarValue::iter_to_array(
group_state_chunk.iter().map(|group_state| {
- group_state.accumulator_set[idx]
+ group_state.group_state.accumulator_set[idx]
.evaluate()
.expect("Unexpected accumulator state in hash aggregate")
}),
@@ -761,7 +1006,7 @@ impl GroupedHashAggregateStream {
// Stores the group by fields
let group_buffers = group_state_chunk
.iter()
- .map(|gs| gs.group_by_values.row())
+ .map(|gs| gs.group_state.group_by_values.row())
.collect::<Vec<_>>();
let mut output: Vec<ArrayRef> = self.row_converter.convert_rows(group_buffers)?;
@@ -785,91 +1030,14 @@ impl GroupedHashAggregateStream {
}
}
}
- Ok(Some(RecordBatch::try_new(self.schema.clone(), output)?))
- }
-}
-
-fn read_as_batch(rows: &[Vec<u8>], schema: &Schema) -> Vec<ArrayRef> {
- let row_num = rows.len();
- let mut output = MutableRecordBatch::new(row_num, Arc::new(schema.clone()));
- let mut row = RowReader::new(schema);
-
- for data in rows {
- row.point_to(0, data);
- read_row(&row, &mut output, schema);
- }
-
- output.output_as_columns()
-}
-
-fn get_at_indices(
- input_values: &[Vec<ArrayRef>],
- batch_indices: &PrimitiveArray<UInt32Type>,
-) -> Result<Vec<Vec<ArrayRef>>> {
- input_values
- .iter()
- .map(|array| get_arrayref_at_indices(array, batch_indices))
- .collect()
-}
-
-fn get_optional_filters(
- original_values: &[Option<Arc<dyn Array>>],
- batch_indices: &PrimitiveArray<UInt32Type>,
-) -> Vec<Option<Arc<dyn Array>>> {
- original_values
- .iter()
- .map(|array| {
- array.as_ref().map(|array| {
- compute::take(
- array.as_ref(),
- batch_indices,
- None, // None: no index check
- )
- .unwrap()
- })
- })
- .collect()
-}
-fn slice_and_maybe_filter(
- aggr_array: &[ArrayRef],
- filter_opt: Option<&Arc<dyn Array>>,
- offsets: &[usize],
-) -> Result<Vec<ArrayRef>> {
- let sliced_arrays: Vec<ArrayRef> = aggr_array
- .iter()
- .map(|array| array.slice(offsets[0], offsets[1] - offsets[0]))
- .collect();
-
- let filtered_arrays = match filter_opt.as_ref() {
- Some(f) => {
- let sliced = f.slice(offsets[0], offsets[1] - offsets[0]);
- let filter_array = as_boolean_array(&sliced)?;
-
- sliced_arrays
- .iter()
- .map(|array| filter(array, filter_array).unwrap())
- .collect::<Vec<ArrayRef>>()
+ // Set status of the emitted groups to GroupStatus::Emitted mode.
+ for gs in self.aggr_state.ordered_group_states[skip_items..end_idx].iter_mut() {
+ if gs.status == GroupStatus::CanEmit {
+ gs.status = GroupStatus::Emitted;
+ }
}
- None => sliced_arrays,
- };
- Ok(filtered_arrays)
-}
-/// This method is similar to Scalar::try_from_array except for the Null handling.
-/// This method returns [ScalarValue::Null] instead of [ScalarValue::Type(None)]
-fn col_to_scalar(
- array: &ArrayRef,
- filter: &Option<&BooleanArray>,
- row_index: usize,
-) -> Result<ScalarValue> {
- if array.is_null(row_index) {
- return Ok(ScalarValue::Null);
- }
- if let Some(filter) = filter {
- if !filter.value(row_index) {
- return Ok(ScalarValue::Null);
- }
+ Ok(Some(RecordBatch::try_new(self.schema.clone(), output)?))
}
- ScalarValue::try_from_array(array, row_index)
}
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs
index 57525b387f..055f60346d 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -18,39 +18,43 @@
//! Aggregates functionalities
use crate::execution::context::TaskContext;
-use crate::physical_plan::aggregates::no_grouping::AggregateStream;
+use crate::physical_plan::aggregates::{
+ bounded_aggregate_stream::BoundedAggregateStream, no_grouping::AggregateStream,
+ row_hash::GroupedHashAggregateStream,
+};
use crate::physical_plan::metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
};
use crate::physical_plan::{
- DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+ DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning,
SendableRecordBatchStream, Statistics,
};
use arrow::array::ArrayRef;
+use arrow::compute::DEFAULT_CAST_OPTIONS;
use arrow::datatypes::{Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
+use datafusion_common::utils::longest_consecutive_prefix;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::Accumulator;
-use datafusion_physical_expr::expressions::{Avg, CastExpr, Column, Sum};
use datafusion_physical_expr::{
- expressions, AggregateExpr, PhysicalExpr, PhysicalSortExpr,
+ aggregate::row_accumulator::RowAccumulator,
+ equivalence::project_equivalence_properties,
+ expressions::{Avg, CastExpr, Column, Sum},
+ normalize_out_expr_with_columns_map,
+ utils::{convert_to_expr, get_indices_of_matching_exprs},
+ AggregateExpr, PhysicalExpr, PhysicalSortExpr,
};
use std::any::Any;
use std::collections::HashMap;
-
-use arrow::compute::DEFAULT_CAST_OPTIONS;
use std::sync::Arc;
+mod bounded_aggregate_stream;
mod no_grouping;
mod row_hash;
+mod utils;
-use crate::physical_plan::aggregates::row_hash::GroupedHashAggregateStream;
-use crate::physical_plan::EquivalenceProperties;
pub use datafusion_expr::AggregateFunction;
-use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulator;
-use datafusion_physical_expr::equivalence::project_equivalence_properties;
pub use datafusion_physical_expr::expressions::create_aggregate_expr;
-use datafusion_physical_expr::normalize_out_expr_with_columns_map;
/// Hash aggregate modes
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
@@ -72,6 +76,21 @@ pub enum AggregateMode {
Single,
}
+/// Group By expression modes
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum GroupByOrderMode {
+ /// Some of the expressions in the GROUP BY clause have an ordering.
+ // For example, if the input is ordered by a, b, c, d and we group by b, a, d;
+ // the mode will be `PartiallyOrdered` meaning a subset of group b, a, d
+ // defines a preset for the existing ordering, e.g a, b defines a preset.
+ PartiallyOrdered,
+ /// All the expressions in the GROUP BY clause have orderings.
+ // For example, if the input is ordered by a, b, c, d and we group by b, a;
+ // the mode will be `Ordered` meaning a all of the of group b, d
+ // defines a preset for the existing ordering, e.g a, b defines a preset.
+ Ordered,
+}
+
/// Represents `GROUP BY` clause in the plan (including the more general GROUPING SET)
/// In the case of a simple `GROUP BY a, b` clause, this will contain the expression [a, b]
/// and a single group [false, false].
@@ -173,6 +192,7 @@ impl PartialEq for PhysicalGroupBy {
enum StreamType {
AggregateStream(AggregateStream),
GroupedHashAggregateStream(GroupedHashAggregateStream),
+ BoundedAggregate(BoundedAggregateStream),
}
impl From<StreamType> for SendableRecordBatchStream {
@@ -180,10 +200,20 @@ impl From<StreamType> for SendableRecordBatchStream {
match stream {
StreamType::AggregateStream(stream) => Box::pin(stream),
StreamType::GroupedHashAggregateStream(stream) => Box::pin(stream),
+ StreamType::BoundedAggregate(stream) => Box::pin(stream),
}
}
}
+#[derive(Debug, Clone)]
+pub(crate) struct AggregationOrdering {
+ mode: GroupByOrderMode,
+ /// Stores indices such that when we iterate with these indices, GROUP BY
+ /// expressions match input ordering.
+ order_indices: Vec<usize>,
+ ordering: Vec<PhysicalSortExpr>,
+}
+
/// Hash aggregate execution plan
#[derive(Debug)]
pub struct AggregateExec {
@@ -208,6 +238,99 @@ pub struct AggregateExec {
columns_map: HashMap<Column, Vec<Column>>,
/// Execution Metrics
metrics: ExecutionPlanMetricsSet,
+ /// Stores mode, and output ordering information for the `AggregateExec`.
+ aggregation_ordering: Option<AggregationOrdering>,
+}
+
+/// Calculates the working mode for `GROUP BY` queries.
+/// - If no GROUP BY expression has an ordering, returns `None`.
+/// - If some GROUP BY expressions have an ordering, returns `Some(GroupByOrderMode::PartiallyOrdered)`.
+/// - If all GROUP BY expressions have orderings, returns `Some(GroupByOrderMode::Ordered)`.
+fn get_working_mode(
+ input: &Arc<dyn ExecutionPlan>,
+ group_by: &PhysicalGroupBy,
+) -> Option<(GroupByOrderMode, Vec<usize>)> {
+ if group_by.groups.len() > 1 {
+ // We do not currently support streaming execution if we have more
+ // than one group (e.g. we have grouping sets).
+ return None;
+ };
+
+ let output_ordering = input.output_ordering().unwrap_or(&[]);
+ // Since direction of the ordering is not important for GROUP BY columns,
+ // we convert PhysicalSortExpr to PhysicalExpr in the existing ordering.
+ let ordering_exprs = convert_to_expr(output_ordering);
+ let groupby_exprs = group_by
+ .expr
+ .iter()
+ .map(|(item, _)| item.clone())
+ .collect::<Vec<_>>();
+ // Find where each expression of the GROUP BY clause occurs in the existing
+ // ordering (if it occurs):
+ let mut ordered_indices =
+ get_indices_of_matching_exprs(&groupby_exprs, &ordering_exprs, || {
+ input.equivalence_properties()
+ });
+ ordered_indices.sort();
+ // Find out how many expressions of the existing ordering define ordering
+ // for expressions in the GROUP BY clause. For example, if the input is
+ // ordered by a, b, c, d and we group by b, a, d; the result below would be.
+ // 2, meaning 2 elements (a, b) among the GROUP BY columns define ordering.
+ let first_n = longest_consecutive_prefix(ordered_indices);
+ if first_n == 0 {
+ // No GROUP by columns are ordered, we can not do streaming execution.
+ return None;
+ }
+ let ordered_exprs = ordering_exprs[0..first_n].to_vec();
+ // Find indices for the GROUP BY expressions such that when we iterate with
+ // these indices, we would match existing ordering. For the example above,
+ // this would produce 1, 0; meaning 1st and 0th entries (a, b) among the
+ // GROUP BY expressions b, a, d match input ordering.
+ let ordered_group_by_indices =
+ get_indices_of_matching_exprs(&ordered_exprs, &groupby_exprs, || {
+ input.equivalence_properties()
+ });
+ Some(if first_n == group_by.expr.len() {
+ (GroupByOrderMode::Ordered, ordered_group_by_indices)
+ } else {
+ (GroupByOrderMode::PartiallyOrdered, ordered_group_by_indices)
+ })
+}
+
+fn calc_aggregation_ordering(
+ input: &Arc<dyn ExecutionPlan>,
+ group_by: &PhysicalGroupBy,
+) -> Option<AggregationOrdering> {
+ get_working_mode(input, group_by).map(|(mode, order_indices)| {
+ let existing_ordering = input.output_ordering().unwrap_or(&[]);
+ let out_group_expr = output_group_expr_helper(group_by);
+ // Calculate output ordering information for the operator:
+ let out_ordering = order_indices
+ .iter()
+ .zip(existing_ordering)
+ .map(|(idx, input_col)| PhysicalSortExpr {
+ expr: out_group_expr[*idx].clone(),
+ options: input_col.options,
+ })
+ .collect::<Vec<_>>();
+ AggregationOrdering {
+ mode,
+ order_indices,
+ ordering: out_ordering,
+ }
+ })
+}
+
+/// Grouping expressions as they occur in the output schema
+fn output_group_expr_helper(group_by: &PhysicalGroupBy) -> Vec<Arc<dyn PhysicalExpr>> {
+ // Update column indices. Since the group by columns come first in the output schema, their
+ // indices are simply 0..self.group_expr(len).
+ group_by
+ .expr()
+ .iter()
+ .enumerate()
+ .map(|(index, (_, name))| Arc::new(Column::new(name, index)) as _)
+ .collect()
}
impl AggregateExec {
@@ -240,6 +363,8 @@ impl AggregateExec {
};
}
+ let aggregation_ordering = calc_aggregation_ordering(&input, &group_by);
+
Ok(AggregateExec {
mode,
group_by,
@@ -250,6 +375,7 @@ impl AggregateExec {
input_schema,
columns_map,
metrics: ExecutionPlanMetricsSet::new(),
+ aggregation_ordering,
})
}
@@ -265,16 +391,7 @@ impl AggregateExec {
/// Grouping expressions as they occur in the output schema
pub fn output_group_expr(&self) -> Vec<Arc<dyn PhysicalExpr>> {
- // Update column indices. Since the group by columns come first in the output schema, their
- // indices are simply 0..self.group_expr(len).
- self.group_by
- .expr()
- .iter()
- .enumerate()
- .map(|(index, (_col, name))| {
- Arc::new(expressions::Column::new(name, index)) as Arc<dyn PhysicalExpr>
- })
- .collect()
+ output_group_expr_helper(&self.group_by)
}
/// Aggregate expressions
@@ -305,6 +422,7 @@ impl AggregateExec {
let batch_size = context.session_config().batch_size();
let input = self.input.execute(partition, Arc::clone(&context))?;
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
+
if self.group_by.expr.is_empty() {
Ok(StreamType::AggregateStream(AggregateStream::new(
self.mode,
@@ -316,6 +434,20 @@ impl AggregateExec {
context,
partition,
)?))
+ } else if let Some(aggregation_ordering) = &self.aggregation_ordering {
+ Ok(StreamType::BoundedAggregate(BoundedAggregateStream::new(
+ self.mode,
+ self.schema.clone(),
+ self.group_by.clone(),
+ self.aggr_expr.clone(),
+ self.filter_expr.clone(),
+ input,
+ baseline_metrics,
+ batch_size,
+ context,
+ partition,
+ aggregation_ordering.clone(),
+ )?))
} else {
Ok(StreamType::GroupedHashAggregateStream(
GroupedHashAggregateStream::new(
@@ -373,20 +505,27 @@ impl ExecutionPlan for AggregateExec {
}
/// Specifies whether this plan generates an infinite stream of records.
- /// If the plan does not support pipelining, but it its input(s) are
+ /// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
if children[0] {
- Err(DataFusionError::Plan(
- "Aggregate Error: `GROUP BY` clause (including the more general GROUPING SET) is not supported for unbounded inputs.".to_string(),
- ))
+ if self.aggregation_ordering.is_none() {
+ // Cannot run without breaking pipeline.
+ Err(DataFusionError::Plan(
+ "Aggregate Error: `GROUP BY` clauses with columns without ordering and GROUPING SETS are not supported for unbounded inputs.".to_string(),
+ ))
+ } else {
+ Ok(true)
+ }
} else {
Ok(false)
}
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- None
+ self.aggregation_ordering
+ .as_ref()
+ .map(|item: &AggregationOrdering| item.ordering.as_slice())
}
fn required_input_distribution(&self) -> Vec<Distribution> {
@@ -801,13 +940,14 @@ mod tests {
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use crate::from_slice::FromSlice;
use crate::physical_plan::aggregates::{
- AggregateExec, AggregateMode, PhysicalGroupBy,
+ get_working_mode, AggregateExec, AggregateMode, PhysicalGroupBy,
};
use crate::physical_plan::expressions::{col, Avg};
- use crate::test::assert_is_pending;
use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
+ use crate::test::{assert_is_pending, csv_exec_sorted};
use crate::{assert_batches_sorted_eq, physical_plan::common};
use arrow::array::{Float64Array, UInt32Array};
+ use arrow::compute::{concat_batches, SortOptions};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::{DataFusionError, Result, ScalarValue};
@@ -819,6 +959,7 @@ mod tests {
use std::task::{Context, Poll};
use super::StreamType;
+ use crate::physical_plan::aggregates::GroupByOrderMode::{Ordered, PartiallyOrdered};
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::{
ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
@@ -826,6 +967,92 @@ mod tests {
};
use crate::prelude::SessionContext;
+ // Generate a schema which consists of 5 columns (a, b, c, d, e)
+ fn create_test_schema() -> Result<SchemaRef> {
+ let a = Field::new("a", DataType::Int32, true);
+ let b = Field::new("b", DataType::Int32, true);
+ let c = Field::new("c", DataType::Int32, true);
+ let d = Field::new("d", DataType::Int32, true);
+ let e = Field::new("e", DataType::Int32, true);
+ let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
+
+ Ok(schema)
+ }
+
+ /// make PhysicalSortExpr with default options
+ fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
+ sort_expr_options(name, schema, SortOptions::default())
+ }
+
+ /// PhysicalSortExpr with specified options
+ fn sort_expr_options(
+ name: &str,
+ schema: &Schema,
+ options: SortOptions,
+ ) -> PhysicalSortExpr {
+ PhysicalSortExpr {
+ expr: col(name, schema).unwrap(),
+ options,
+ }
+ }
+
+ #[tokio::test]
+ async fn test_get_working_mode() -> Result<()> {
+ let test_schema = create_test_schema()?;
+ // Source is sorted by a ASC NULLS FIRST, b ASC NULLS FIRST, c ASC NULLS FIRST
+ // Column d, e is not ordered.
+ let sort_exprs = vec![
+ sort_expr("a", &test_schema),
+ sort_expr("b", &test_schema),
+ sort_expr("c", &test_schema),
+ ];
+ let input = csv_exec_sorted(&test_schema, sort_exprs, true);
+
+ // test cases consists of vector of tuples. Where each tuple represents a single test case.
+ // First field in the tuple is Vec<str> where each element in the vector represents GROUP BY columns
+ // For instance `vec!["a", "b"]` corresponds to GROUP BY a, b
+ // Second field in the tuple is Option<GroupByOrderMode>, which corresponds to expected algorithm mode.
+ // None represents that existing ordering is not sufficient to run executor with any one of the algorithms
+ // (We need to add SortExec to be able to run it).
+ // Some(GroupByOrderMode) represents, we can run algorithm with existing ordering; and algorithm should work in
+ // GroupByOrderMode.
+ let test_cases = vec![
+ (vec!["a"], Some((Ordered, vec![0]))),
+ (vec!["b"], None),
+ (vec!["c"], None),
+ (vec!["b", "a"], Some((Ordered, vec![1, 0]))),
+ (vec!["c", "b"], None),
+ (vec!["c", "a"], Some((PartiallyOrdered, vec![1]))),
+ (vec!["c", "b", "a"], Some((Ordered, vec![2, 1, 0]))),
+ (vec!["d", "a"], Some((PartiallyOrdered, vec![1]))),
+ (vec!["d", "b"], None),
+ (vec!["d", "c"], None),
+ (vec!["d", "b", "a"], Some((PartiallyOrdered, vec![2, 1]))),
+ (vec!["d", "c", "b"], None),
+ (vec!["d", "c", "a"], Some((PartiallyOrdered, vec![2]))),
+ (
+ vec!["d", "c", "b", "a"],
+ Some((PartiallyOrdered, vec![3, 2, 1])),
+ ),
+ ];
+ for (case_idx, test_case) in test_cases.iter().enumerate() {
+ let (group_by_columns, expected) = &test_case;
+ let mut group_by_exprs = vec![];
+ for col_name in group_by_columns {
+ group_by_exprs.push((col(col_name, &test_schema)?, col_name.to_string()));
+ }
+ let group_bys = PhysicalGroupBy::new_single(group_by_exprs);
+ let res = get_working_mode(&input, &group_bys);
+ assert_eq!(
+ res, *expected,
+ "Unexpected result for in unbounded test case#: {:?}, case: {:?}",
+ case_idx, test_case
+ );
+ }
+
+ Ok(())
+ }
+
/// some mock data to aggregates
fn some_data() -> (Arc<Schema>, Vec<RecordBatch>) {
// define a schema.
@@ -940,9 +1167,7 @@ mod tests {
let result =
common::collect(merged_aggregate.execute(0, task_ctx.clone())?).await?;
- assert_eq!(result.len(), 1);
-
- let batch = &result[0];
+ let batch = concat_batches(&result[0].schema(), &result)?;
assert_eq!(batch.num_columns(), 3);
assert_eq!(batch.num_rows(), 12);
@@ -1037,9 +1262,7 @@ mod tests {
let result =
common::collect(merged_aggregate.execute(0, task_ctx.clone())?).await?;
- assert_eq!(result.len(), 1);
-
- let batch = &result[0];
+ let batch = concat_batches(&result[0].schema(), &result)?;
assert_eq!(batch.num_columns(), 2);
assert_eq!(batch.num_rows(), 3);
diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
index bf1846ae98..7b851aa1da 100644
--- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs
+++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
@@ -24,7 +24,7 @@ use std::task::{Context, Poll};
use std::vec;
use ahash::RandomState;
-use arrow::row::{OwnedRow, RowConverter, SortField};
+use arrow::row::{RowConverter, SortField};
use datafusion_physical_expr::hash_utils::create_hashes;
use futures::ready;
use futures::stream::{Stream, StreamExt};
@@ -32,25 +32,26 @@ use futures::stream::{Stream, StreamExt};
use crate::execution::context::TaskContext;
use crate::execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
+use crate::physical_plan::aggregates::utils::{
+ aggr_state_schema, col_to_scalar, get_at_indices, get_optional_filters,
+ read_as_batch, slice_and_maybe_filter, ExecutionState, GroupState,
+};
use crate::physical_plan::aggregates::{
- evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AccumulatorItem,
- AggregateMode, PhysicalGroupBy, RowAccumulatorItem,
+ evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AggregateMode,
+ PhysicalGroupBy, RowAccumulatorItem,
};
use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
use arrow::array::*;
-use arrow::compute::{cast, filter};
-use arrow::datatypes::{DataType, Schema, UInt32Type};
-use arrow::{compute, datatypes::SchemaRef, record_batch::RecordBatch};
+use arrow::compute::cast;
+use arrow::datatypes::DataType;
+use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use datafusion_common::cast::as_boolean_array;
-use datafusion_common::utils::get_arrayref_at_indices;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::Accumulator;
use datafusion_row::accessor::RowAccessor;
use datafusion_row::layout::RowLayout;
-use datafusion_row::reader::{read_row, RowReader};
-use datafusion_row::MutableRecordBatch;
use hashbrown::raw::RawTable;
use itertools::izip;
@@ -68,7 +69,6 @@ use itertools::izip;
/// 4. The state's RecordBatch is `merge`d to a new state
/// 5. The state is mapped to the final value
///
-/// [Arrow-row]: OwnedRow
/// [WordAligned]: datafusion_row::layout
pub(crate) struct GroupedHashAggregateStream {
schema: SchemaRef,
@@ -107,22 +107,6 @@ pub(crate) struct GroupedHashAggregateStream {
indices: [Vec<Range<usize>>; 2],
}
-#[derive(Debug)]
-/// tracks what phase the aggregation is in
-enum ExecutionState {
- ReadingInput,
- ProducingOutput,
- Done,
-}
-
-fn aggr_state_schema(aggr_expr: &[Arc<dyn AggregateExpr>]) -> Result<SchemaRef> {
- let fields = aggr_expr
- .iter()
- .flat_map(|expr| expr.state_fields().unwrap().into_iter())
- .collect::<Vec<_>>();
- Ok(Arc::new(Schema::new(fields)))
-}
-
impl GroupedHashAggregateStream {
/// Create a new GroupedHashAggregateStream
#[allow(clippy::too_many_arguments)]
@@ -617,25 +601,8 @@ impl GroupedHashAggregateStream {
}
}
-/// The state that is built for each output group.
-#[derive(Debug)]
-pub struct GroupState {
- /// The actual group by values, stored sequentially
- group_by_values: OwnedRow,
-
- // Accumulator state, stored sequentially
- pub aggregation_buffer: Vec<u8>,
-
- // Accumulator state, one for each aggregate that doesn't support row accumulation
- pub accumulator_set: Vec<AccumulatorItem>,
-
- /// scratch space used to collect indices for input rows in a
- /// bach that have values to aggregate. Reset on each batch
- pub indices: Vec<u32>,
-}
-
/// The state of all the groups
-pub struct AggregationState {
+pub(crate) struct AggregationState {
pub reservation: MemoryReservation,
/// Logically maps group values to an index in `group_states`
@@ -788,88 +755,3 @@ impl GroupedHashAggregateStream {
Ok(Some(RecordBatch::try_new(self.schema.clone(), output)?))
}
}
-
-fn read_as_batch(rows: &[Vec<u8>], schema: &Schema) -> Vec<ArrayRef> {
- let row_num = rows.len();
- let mut output = MutableRecordBatch::new(row_num, Arc::new(schema.clone()));
- let mut row = RowReader::new(schema);
-
- for data in rows {
- row.point_to(0, data);
- read_row(&row, &mut output, schema);
- }
-
- output.output_as_columns()
-}
-
-fn get_at_indices(
- input_values: &[Vec<ArrayRef>],
- batch_indices: &PrimitiveArray<UInt32Type>,
-) -> Result<Vec<Vec<ArrayRef>>> {
- input_values
- .iter()
- .map(|array| get_arrayref_at_indices(array, batch_indices))
- .collect()
-}
-
-fn get_optional_filters(
- original_values: &[Option<Arc<dyn Array>>],
- batch_indices: &PrimitiveArray<UInt32Type>,
-) -> Vec<Option<Arc<dyn Array>>> {
- original_values
- .iter()
- .map(|array| {
- array.as_ref().map(|array| {
- compute::take(
- array.as_ref(),
- batch_indices,
- None, // None: no index check
- )
- .unwrap()
- })
- })
- .collect()
-}
-
-fn slice_and_maybe_filter(
- aggr_array: &[ArrayRef],
- filter_opt: Option<&Arc<dyn Array>>,
- offsets: &[usize],
-) -> Result<Vec<ArrayRef>> {
- let sliced_arrays: Vec<ArrayRef> = aggr_array
- .iter()
- .map(|array| array.slice(offsets[0], offsets[1] - offsets[0]))
- .collect();
-
- let filtered_arrays = match filter_opt.as_ref() {
- Some(f) => {
- let sliced = f.slice(offsets[0], offsets[1] - offsets[0]);
- let filter_array = as_boolean_array(&sliced)?;
-
- sliced_arrays
- .iter()
- .map(|array| filter(array, filter_array).unwrap())
- .collect::<Vec<ArrayRef>>()
- }
- None => sliced_arrays,
- };
- Ok(filtered_arrays)
-}
-
-/// This method is similar to Scalar::try_from_array except for the Null handling.
-/// This method returns [ScalarValue::Null] instead of [ScalarValue::Type(None)]
-fn col_to_scalar(
- array: &ArrayRef,
- filter: &Option<&BooleanArray>,
- row_index: usize,
-) -> Result<ScalarValue> {
- if array.is_null(row_index) {
- return Ok(ScalarValue::Null);
- }
- if let Some(filter) = filter {
- if !filter.value(row_index) {
- return Ok(ScalarValue::Null);
- }
- }
- ScalarValue::try_from_array(array, row_index)
-}
diff --git a/datafusion/core/src/physical_plan/aggregates/utils.rs b/datafusion/core/src/physical_plan/aggregates/utils.rs
new file mode 100644
index 0000000000..54485b1740
--- /dev/null
+++ b/datafusion/core/src/physical_plan/aggregates/utils.rs
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::physical_plan::aggregates::AccumulatorItem;
+use arrow::compute;
+use arrow::compute::filter;
+use arrow::row::OwnedRow;
+use arrow_array::types::UInt32Type;
+use arrow_array::{Array, ArrayRef, BooleanArray, PrimitiveArray};
+use arrow_schema::{Schema, SchemaRef};
+use datafusion_common::cast::as_boolean_array;
+use datafusion_common::utils::get_arrayref_at_indices;
+use datafusion_common::{Result, ScalarValue};
+use datafusion_physical_expr::AggregateExpr;
+use datafusion_row::reader::{read_row, RowReader};
+use datafusion_row::MutableRecordBatch;
+use std::sync::Arc;
+
+/// The state that is built for each output group.
+#[derive(Debug)]
+pub(crate) struct GroupState {
+ /// The actual group by values, stored sequentially
+ pub group_by_values: OwnedRow,
+
+ // Accumulator state, stored sequentially
+ pub aggregation_buffer: Vec<u8>,
+
+ // Accumulator state, one for each aggregate that doesn't support row accumulation
+ pub accumulator_set: Vec<AccumulatorItem>,
+
+ /// scratch space used to collect indices for input rows in a
+ /// bach that have values to aggregate. Reset on each batch
+ pub indices: Vec<u32>,
+}
+
+#[derive(Debug)]
+/// tracks what phase the aggregation is in
+pub(crate) enum ExecutionState {
+ ReadingInput,
+ ProducingOutput,
+ Done,
+}
+
+pub(crate) fn aggr_state_schema(
+ aggr_expr: &[Arc<dyn AggregateExpr>],
+) -> Result<SchemaRef> {
+ let fields = aggr_expr
+ .iter()
+ .flat_map(|expr| expr.state_fields().unwrap().into_iter())
+ .collect::<Vec<_>>();
+ Ok(Arc::new(Schema::new(fields)))
+}
+
+pub(crate) fn read_as_batch(rows: &[Vec<u8>], schema: &Schema) -> Vec<ArrayRef> {
+ let row_num = rows.len();
+ let mut output = MutableRecordBatch::new(row_num, Arc::new(schema.clone()));
+ let mut row = RowReader::new(schema);
+
+ for data in rows {
+ row.point_to(0, data);
+ read_row(&row, &mut output, schema);
+ }
+
+ output.output_as_columns()
+}
+
+pub(crate) fn get_at_indices(
+ input_values: &[Vec<ArrayRef>],
+ batch_indices: &PrimitiveArray<UInt32Type>,
+) -> Result<Vec<Vec<ArrayRef>>> {
+ input_values
+ .iter()
+ .map(|array| get_arrayref_at_indices(array, batch_indices))
+ .collect()
+}
+
+pub(crate) fn get_optional_filters(
+ original_values: &[Option<Arc<dyn Array>>],
+ batch_indices: &PrimitiveArray<UInt32Type>,
+) -> Vec<Option<Arc<dyn Array>>> {
+ original_values
+ .iter()
+ .map(|array| {
+ array.as_ref().map(|array| {
+ compute::take(
+ array.as_ref(),
+ batch_indices,
+ None, // None: no index check
+ )
+ .unwrap()
+ })
+ })
+ .collect()
+}
+
+pub(crate) fn slice_and_maybe_filter(
+ aggr_array: &[ArrayRef],
+ filter_opt: Option<&Arc<dyn Array>>,
+ offsets: &[usize],
+) -> Result<Vec<ArrayRef>> {
+ let sliced_arrays: Vec<ArrayRef> = aggr_array
+ .iter()
+ .map(|array| array.slice(offsets[0], offsets[1] - offsets[0]))
+ .collect();
+
+ let filtered_arrays = match filter_opt.as_ref() {
+ Some(f) => {
+ let sliced = f.slice(offsets[0], offsets[1] - offsets[0]);
+ let filter_array = as_boolean_array(&sliced)?;
+
+ sliced_arrays
+ .iter()
+ .map(|array| filter(array, filter_array).unwrap())
+ .collect::<Vec<ArrayRef>>()
+ }
+ None => sliced_arrays,
+ };
+ Ok(filtered_arrays)
+}
+
+/// This method is similar to Scalar::try_from_array except for the Null handling.
+/// This method returns [ScalarValue::Null] instead of [ScalarValue::Type(None)]
+pub(crate) fn col_to_scalar(
+ array: &ArrayRef,
+ filter: &Option<&BooleanArray>,
+ row_index: usize,
+) -> Result<ScalarValue> {
+ if array.is_null(row_index) {
+ return Ok(ScalarValue::Null);
+ }
+ if let Some(filter) = filter {
+ if !filter.value(row_index) {
+ return Ok(ScalarValue::Null);
+ }
+ }
+ ScalarValue::try_from_array(array, row_index)
+}
diff --git a/datafusion/core/src/physical_plan/analyze.rs b/datafusion/core/src/physical_plan/analyze.rs
index 2e12251c2f..08b5bb34ed 100644
--- a/datafusion/core/src/physical_plan/analyze.rs
+++ b/datafusion/core/src/physical_plan/analyze.rs
@@ -77,7 +77,7 @@ impl ExecutionPlan for AnalyzeExec {
}
/// Specifies whether this plan generates an infinite stream of records.
- /// If the plan does not support pipelining, but it its input(s) are
+ /// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
if children[0] {
diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs
index ec7dd7b4d6..82f37ea396 100644
--- a/datafusion/core/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/core/src/physical_plan/coalesce_batches.rs
@@ -96,7 +96,7 @@ impl ExecutionPlan for CoalesceBatchesExec {
}
/// Specifies whether this plan generates an infinite stream of records.
- /// If the plan does not support pipelining, but it its input(s) are
+ /// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
Ok(children[0])
diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs b/datafusion/core/src/physical_plan/coalesce_partitions.rs
index 8ff8a37aa3..fe667d1e6e 100644
--- a/datafusion/core/src/physical_plan/coalesce_partitions.rs
+++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs
@@ -81,7 +81,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
}
/// Specifies whether this plan generates an infinite stream of records.
- /// If the plan does not support pipelining, but it its input(s) are
+ /// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
Ok(children[0])
diff --git a/datafusion/core/src/physical_plan/filter.rs b/datafusion/core/src/physical_plan/filter.rs
index 494d3fc869..3786568cf3 100644
--- a/datafusion/core/src/physical_plan/filter.rs
+++ b/datafusion/core/src/physical_plan/filter.rs
@@ -107,7 +107,7 @@ impl ExecutionPlan for FilterExec {
}
/// Specifies whether this plan generates an infinite stream of records.
- /// If the plan does not support pipelining, but it its input(s) are
+ /// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
Ok(children[0])
diff --git a/datafusion/core/src/physical_plan/joins/cross_join.rs b/datafusion/core/src/physical_plan/joins/cross_join.rs
index 44a91865e2..67b7fd9718 100644
--- a/datafusion/core/src/physical_plan/joins/cross_join.rs
+++ b/datafusion/core/src/physical_plan/joins/cross_join.rs
@@ -160,7 +160,7 @@ impl ExecutionPlan for CrossJoinExec {
}
/// Specifies whether this plan generates an infinite stream of records.
- /// If the plan does not support pipelining, but it its input(s) are
+ /// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
if children[0] || children[1] {
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs
index c624c5c17d..05a3c25243 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -247,7 +247,7 @@ impl ExecutionPlan for HashJoinExec {
}
/// Specifies whether this plan generates an infinite stream of records.
- /// If the plan does not support pipelining, but it its input(s) are
+ /// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
let (left, right) = (children[0], children[1]);
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index 23d3d70848..4356d4aa85 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -514,13 +514,12 @@ impl ExecutionPlan for SymmetricHashJoinExec {
for (join_side, child) in join_sides.iter().zip(children.iter()) {
let sorted_expr = child
.output_ordering()
- .and_then(|orders| orders.first())
- .and_then(|order| {
+ .and_then(|orders| {
build_filter_input_order(
*join_side,
filter,
&child.schema(),
- order,
+ &orders[0],
)
.transpose()
})
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index 50c145095b..01ece80aca 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -111,7 +111,7 @@ pub trait ExecutionPlan: Debug + Send + Sync {
fn output_partitioning(&self) -> Partitioning;
/// Specifies whether this plan generates an infinite stream of records.
- /// If the plan does not support pipelining, but it its input(s) are
+ /// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
Ok(false)
diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs
index 799de0c191..49c429f94d 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -156,7 +156,7 @@ impl ExecutionPlan for ProjectionExec {
}
/// Specifies whether this plan generates an infinite stream of records.
- /// If the plan does not support pipelining, but it its input(s) are
+ /// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
Ok(children[0])
diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs b/datafusion/core/src/physical_plan/repartition/mod.rs
index 7f13418d26..8db230a122 100644
--- a/datafusion/core/src/physical_plan/repartition/mod.rs
+++ b/datafusion/core/src/physical_plan/repartition/mod.rs
@@ -318,7 +318,7 @@ impl ExecutionPlan for RepartitionExec {
}
/// Specifies whether this plan generates an infinite stream of records.
- /// If the plan does not support pipelining, but it its input(s) are
+ /// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
Ok(children[0])
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs
index 9196f520ad..5f644b658b 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -739,7 +739,7 @@ impl ExecutionPlan for SortExec {
}
/// Specifies whether this plan generates an infinite stream of records.
- /// If the plan does not support pipelining, but it its input(s) are
+ /// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
if children[0] {
diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs
index 0448813240..cace842aaf 100644
--- a/datafusion/core/src/physical_plan/union.rs
+++ b/datafusion/core/src/physical_plan/union.rs
@@ -157,7 +157,7 @@ impl ExecutionPlan for UnionExec {
}
/// Specifies whether this plan generates an infinite stream of records.
- /// If the plan does not support pipelining, but it its input(s) are
+ /// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
Ok(children.iter().any(|x| *x))
@@ -355,7 +355,7 @@ impl ExecutionPlan for InterleaveExec {
}
/// Specifies whether this plan generates an infinite stream of records.
- /// If the plan does not support pipelining, but it its input(s) are
+ /// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
Ok(children.iter().any(|x| *x))
diff --git a/datafusion/core/src/physical_plan/unnest.rs b/datafusion/core/src/physical_plan/unnest.rs
index 0d60273c30..9a9408035d 100644
--- a/datafusion/core/src/physical_plan/unnest.rs
+++ b/datafusion/core/src/physical_plan/unnest.rs
@@ -77,7 +77,7 @@ impl ExecutionPlan for UnnestExec {
}
/// Specifies whether this plan generates an infinite stream of records.
- /// If the plan does not support pipelining, but it its input(s) are
+ /// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
Ok(children[0])
diff --git a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
index 5dbafab76f..ac84a6a395 100644
--- a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
@@ -171,8 +171,7 @@ impl BoundedWindowAggExec {
// to calculate partition separation points
pub fn partition_by_sort_keys(&self) -> Result<Vec<PhysicalSortExpr>> {
// Partition by sort keys indices are stored in self.ordered_partition_by_indices.
- let sort_keys = self.input.output_ordering();
- let sort_keys = sort_keys.unwrap_or(&[]);
+ let sort_keys = self.input.output_ordering().unwrap_or(&[]);
get_at_indices(sort_keys, &self.ordered_partition_by_indices)
}
diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index bda1b52ff8..7ca95954ce 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -144,7 +144,7 @@ impl ExecutionPlan for WindowAggExec {
}
/// Specifies whether this plan generates an infinite stream of records.
- /// If the plan does not support pipelining, but it its input(s) are
+ /// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
if children[0] {
diff --git a/datafusion/core/tests/aggregate_fuzz.rs b/datafusion/core/tests/aggregate_fuzz.rs
new file mode 100644
index 0000000000..14cf469624
--- /dev/null
+++ b/datafusion/core/tests/aggregate_fuzz.rs
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow::array::{ArrayRef, Int64Array};
+use arrow::compute::{concat_batches, SortOptions};
+use arrow::datatypes::DataType;
+use arrow::record_batch::RecordBatch;
+use arrow::util::pretty::pretty_format_batches;
+use datafusion::physical_plan::aggregates::{
+ AggregateExec, AggregateMode, PhysicalGroupBy,
+};
+use rand::rngs::StdRng;
+use rand::{Rng, SeedableRng};
+
+use datafusion::physical_plan::collect;
+use datafusion::physical_plan::memory::MemoryExec;
+use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion_physical_expr::expressions::{col, Sum};
+use datafusion_physical_expr::{AggregateExpr, PhysicalSortExpr};
+use test_utils::add_empty_batches;
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
+ async fn aggregate_test() {
+ let test_cases = vec![
+ vec!["a"],
+ vec!["b", "a"],
+ vec!["c", "a"],
+ vec!["c", "b", "a"],
+ vec!["d", "a"],
+ vec!["d", "b", "a"],
+ vec!["d", "c", "a"],
+ vec!["d", "c", "b", "a"],
+ ];
+ let n = 300;
+ let distincts = vec![10, 20];
+ for distinct in distincts {
+ let mut handles = Vec::new();
+ for i in 0..n {
+ let test_idx = i % test_cases.len();
+ let group_by_columns = test_cases[test_idx].clone();
+ let job = tokio::spawn(run_aggregate_test(
+ make_staggered_batches::<true>(1000, distinct, i as u64),
+ group_by_columns,
+ ));
+ handles.push(job);
+ }
+ for job in handles {
+ job.await.unwrap();
+ }
+ }
+ }
+}
+
+/// Perform batch and streaming aggregation with same input
+/// and verify outputs of `AggregateExec` with pipeline breaking stream `GroupedHashAggregateStream`
+/// and non-pipeline breaking stream `BoundedAggregateStream` produces same result.
+async fn run_aggregate_test(input1: Vec<RecordBatch>, group_by_columns: Vec<&str>) {
+ let schema = input1[0].schema();
+ let session_config = SessionConfig::new().with_batch_size(50);
+ let ctx = SessionContext::with_config(session_config);
+ let mut sort_keys = vec![];
+ for ordering_col in ["a", "b", "c"] {
+ sort_keys.push(PhysicalSortExpr {
+ expr: col(ordering_col, &schema).unwrap(),
+ options: SortOptions::default(),
+ })
+ }
+
+ let concat_input_record = concat_batches(&schema, &input1).unwrap();
+ let usual_source = Arc::new(
+ MemoryExec::try_new(&[vec![concat_input_record]], schema.clone(), None).unwrap(),
+ );
+
+ let running_source = Arc::new(
+ MemoryExec::try_new(&[input1.clone()], schema.clone(), None)
+ .unwrap()
+ .with_sort_information(sort_keys),
+ );
+
+ let aggregate_expr = vec![Arc::new(Sum::new(
+ col("d", &schema).unwrap(),
+ "sum1",
+ DataType::Int64,
+ )) as Arc<dyn AggregateExpr>];
+ let expr = group_by_columns
+ .iter()
+ .map(|elem| (col(elem, &schema).unwrap(), elem.to_string()))
+ .collect::<Vec<_>>();
+ let group_by = PhysicalGroupBy::new_single(expr);
+ let aggregate_exec_running = Arc::new(
+ AggregateExec::try_new(
+ AggregateMode::Partial,
+ group_by.clone(),
+ aggregate_expr.clone(),
+ vec![None],
+ running_source,
+ schema.clone(),
+ )
+ .unwrap(),
+ ) as _;
+
+ let aggregate_exec_usual = Arc::new(
+ AggregateExec::try_new(
+ AggregateMode::Partial,
+ group_by.clone(),
+ aggregate_expr.clone(),
+ vec![None],
+ usual_source,
+ schema.clone(),
+ )
+ .unwrap(),
+ ) as _;
+
+ let task_ctx = ctx.task_ctx();
+ let collected_usual = collect(aggregate_exec_usual, task_ctx.clone())
+ .await
+ .unwrap();
+
+ let collected_running = collect(aggregate_exec_running, task_ctx.clone())
+ .await
+ .unwrap();
+ assert!(collected_running.len() > 2);
+ // Running should produce more chunk than the usual AggregateExec.
+ // Otherwise it means that we cannot generate result in running mode.
+ assert!(collected_running.len() > collected_usual.len());
+ // compare
+ let usual_formatted = pretty_format_batches(&collected_usual).unwrap().to_string();
+ let running_formatted = pretty_format_batches(&collected_running)
+ .unwrap()
+ .to_string();
+
+ let mut usual_formatted_sorted: Vec<&str> = usual_formatted.trim().lines().collect();
+ usual_formatted_sorted.sort_unstable();
+
+ let mut running_formatted_sorted: Vec<&str> =
+ running_formatted.trim().lines().collect();
+ running_formatted_sorted.sort_unstable();
+ for (i, (usual_line, running_line)) in usual_formatted_sorted
+ .iter()
+ .zip(&running_formatted_sorted)
+ .enumerate()
+ {
+ assert_eq!((i, usual_line), (i, running_line), "Inconsistent result");
+ }
+}
+
+/// Return randomly sized record batches with:
+/// three sorted int64 columns 'a', 'b', 'c' ranged from 0..'n_distinct' as columns
+/// one random int64 column 'd' as other columns
+pub(crate) fn make_staggered_batches<const STREAM: bool>(
+ len: usize,
+ n_distinct: usize,
+ random_seed: u64,
+) -> Vec<RecordBatch> {
+ // use a random number generator to pick a random sized output
+ let mut rng = StdRng::seed_from_u64(random_seed);
+ let mut input123: Vec<(i64, i64, i64)> = vec![(0, 0, 0); len];
+ let mut input4: Vec<i64> = vec![0; len];
+ input123.iter_mut().for_each(|v| {
+ *v = (
+ rng.gen_range(0..n_distinct) as i64,
+ rng.gen_range(0..n_distinct) as i64,
+ rng.gen_range(0..n_distinct) as i64,
+ )
+ });
+ input4.iter_mut().for_each(|v| {
+ *v = rng.gen_range(0..n_distinct) as i64;
+ });
+ input123.sort();
+ let input1 = Int64Array::from_iter_values(input123.clone().into_iter().map(|k| k.0));
+ let input2 = Int64Array::from_iter_values(input123.clone().into_iter().map(|k| k.1));
+ let input3 = Int64Array::from_iter_values(input123.clone().into_iter().map(|k| k.2));
+ let input4 = Int64Array::from_iter_values(input4.into_iter());
+
+ // split into several record batches
+ let mut remainder = RecordBatch::try_from_iter(vec![
+ ("a", Arc::new(input1) as ArrayRef),
+ ("b", Arc::new(input2) as ArrayRef),
+ ("c", Arc::new(input3) as ArrayRef),
+ ("d", Arc::new(input4) as ArrayRef),
+ ])
+ .unwrap();
+
+ let mut batches = vec![];
+ if STREAM {
+ while remainder.num_rows() > 0 {
+ let batch_size = rng.gen_range(0..50);
+ if remainder.num_rows() < batch_size {
+ break;
+ }
+ batches.push(remainder.slice(0, batch_size));
+ remainder = remainder.slice(batch_size, remainder.num_rows() - batch_size);
+ }
+ } else {
+ while remainder.num_rows() > 0 {
+ let batch_size = rng.gen_range(0..remainder.num_rows() + 1);
+ batches.push(remainder.slice(0, batch_size));
+ remainder = remainder.slice(batch_size, remainder.num_rows() - batch_size);
+ }
+ }
+ add_empty_batches(batches, &mut rng)
+}
diff --git a/datafusion/core/tests/sql/group_by.rs b/datafusion/core/tests/sql/group_by.rs
index b4a92db3fc..299c49028b 100644
--- a/datafusion/core/tests/sql/group_by.rs
+++ b/datafusion/core/tests/sql/group_by.rs
@@ -16,6 +16,7 @@
// under the License.
use super::*;
+use datafusion::test_util::get_test_context2;
#[tokio::test]
async fn group_by_date_trunc() -> Result<()> {
@@ -160,3 +161,99 @@ async fn group_by_dictionary() {
run_test_case::<UInt32Type>().await;
run_test_case::<UInt64Type>().await;
}
+
+#[tokio::test]
+async fn test_source_sorted_groupby() -> Result<()> {
+ let tmpdir = TempDir::new().unwrap();
+ let session_config = SessionConfig::new().with_target_partitions(1);
+ let ctx = get_test_context2(&tmpdir, true, session_config).await?;
+
+ let sql = "SELECT a, b,
+ SUM(c) as summation1
+ FROM annotated_data
+ GROUP BY b, a";
+
+ let msg = format!("Creating logical plan for '{sql}'");
+ let dataframe = ctx.sql(sql).await.expect(&msg);
+ let physical_plan = dataframe.create_physical_plan().await?;
+ let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+ let expected = {
+ vec![
+ "ProjectionExec: expr=[a@1 as a, b@0 as b, SUM(annotated_data.c)@2 as summation1]",
+ " AggregateExec: mode=Single, gby=[b@1 as b, a@0 as a], aggr=[SUM(annotated_data.c)]",
+ ]
+ };
+
+ let actual: Vec<&str> = formatted.trim().lines().collect();
+ let actual_len = actual.len();
+ let actual_trim_last = &actual[..actual_len - 1];
+ assert_eq!(
+ expected, actual_trim_last,
+ "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+ );
+
+ let actual = execute_to_batches(&ctx, sql).await;
+ let expected = vec![
+ "+---+---+------------+",
+ "| a | b | summation1 |",
+ "+---+---+------------+",
+ "| 0 | 0 | 300 |",
+ "| 0 | 1 | 925 |",
+ "| 1 | 2 | 1550 |",
+ "| 1 | 3 | 2175 |",
+ "+---+---+------------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_source_sorted_groupby2() -> Result<()> {
+ let tmpdir = TempDir::new().unwrap();
+ let session_config = SessionConfig::new().with_target_partitions(1);
+ let ctx = get_test_context2(&tmpdir, true, session_config).await?;
+
+ let sql = "SELECT a, d,
+ SUM(c) as summation1
+ FROM annotated_data
+ GROUP BY d, a";
+
+ let msg = format!("Creating logical plan for '{sql}'");
+ let dataframe = ctx.sql(sql).await.expect(&msg);
+ let physical_plan = dataframe.create_physical_plan().await?;
+ let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+ let expected = {
+ vec![
+ "ProjectionExec: expr=[a@1 as a, d@0 as d, SUM(annotated_data.c)@2 as summation1]",
+ " AggregateExec: mode=Single, gby=[d@2 as d, a@0 as a], aggr=[SUM(annotated_data.c)]",
+ ]
+ };
+
+ let actual: Vec<&str> = formatted.trim().lines().collect();
+ let actual_len = actual.len();
+ let actual_trim_last = &actual[..actual_len - 1];
+ assert_eq!(
+ expected, actual_trim_last,
+ "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+ );
+
+ let actual = execute_to_batches(&ctx, sql).await;
+ let expected = vec![
+ "+---+---+------------+",
+ "| a | d | summation1 |",
+ "+---+---+------------+",
+ "| 0 | 0 | 292 |",
+ "| 0 | 2 | 196 |",
+ "| 0 | 1 | 315 |",
+ "| 0 | 4 | 164 |",
+ "| 0 | 3 | 258 |",
+ "| 1 | 0 | 622 |",
+ "| 1 | 3 | 299 |",
+ "| 1 | 1 | 1043 |",
+ "| 1 | 4 | 913 |",
+ "| 1 | 2 | 848 |",
+ "+---+---+------------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ Ok(())
+}
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt b/datafusion/core/tests/sqllogictests/test_files/window.slt
index 8b95005e1b..60339955b4 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -356,23 +356,22 @@ Sort: d.b ASC NULLS LAST
EmptyRelation
physical_plan
SortPreservingMergeExec: [b@0 ASC NULLS LAST]
- SortExec: expr=[b@0 ASC NULLS LAST]
- ProjectionExec: expr=[b@0 as b, MAX(d.a)@1 as max_a, MAX(d.seq)@2 as MAX(d.seq)]
- AggregateExec: mode=Single, gby=[b@2 as b], aggr=[MAX(d.a), MAX(d.seq)]
- ProjectionExec: expr=[ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as seq, a@0 as a, b@1 as b]
- BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
- SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST]
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([Column { name: "b", index: 1 }], 4), input_partitions=4
- UnionExec
- ProjectionExec: expr=[1 as a, aa as b]
- EmptyExec: produce_one_row=true
- ProjectionExec: expr=[3 as a, aa as b]
- EmptyExec: produce_one_row=true
- ProjectionExec: expr=[5 as a, bb as b]
- EmptyExec: produce_one_row=true
- ProjectionExec: expr=[7 as a, bb as b]
- EmptyExec: produce_one_row=true
+ ProjectionExec: expr=[b@0 as b, MAX(d.a)@1 as max_a, MAX(d.seq)@2 as MAX(d.seq)]
+ AggregateExec: mode=Single, gby=[b@2 as b], aggr=[MAX(d.a), MAX(d.seq)]
+ ProjectionExec: expr=[ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as seq, a@0 as a, b@1 as b]
+ BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+ SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST]
+ CoalesceBatchesExec: target_batch_size=8192
+ RepartitionExec: partitioning=Hash([Column { name: "b", index: 1 }], 4), input_partitions=4
+ UnionExec
+ ProjectionExec: expr=[1 as a, aa as b]
+ EmptyExec: produce_one_row=true
+ ProjectionExec: expr=[3 as a, aa as b]
+ EmptyExec: produce_one_row=true
+ ProjectionExec: expr=[5 as a, bb as b]
+ EmptyExec: produce_one_row=true
+ ProjectionExec: expr=[7 as a, bb as b]
+ EmptyExec: produce_one_row=true
# check actual result
diff --git a/datafusion/core/tests/window_fuzz.rs b/datafusion/core/tests/window_fuzz.rs
index e8524304a3..77b6e0a5d1 100644
--- a/datafusion/core/tests/window_fuzz.rs
+++ b/datafusion/core/tests/window_fuzz.rs
@@ -480,6 +480,10 @@ async fn run_window_test(
let collected_running = collect(running_window_exec, task_ctx.clone())
.await
.unwrap();
+
+ // BoundedWindowAggExec should produce more chunk than the usual WindowAggExec.
+ // Otherwise it means that we cannot generate result in running mode.
+ assert!(collected_running.len() > collected_usual.len());
// compare
let usual_formatted = pretty_format_batches(&collected_usual).unwrap().to_string();
let running_formatted = pretty_format_batches(&collected_running)