You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ak...@apache.org on 2023/04/27 14:13:10 UTC

[arrow-datafusion] branch main updated: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered (V2) (#6124)

This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new aec34205e9 Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered (V2) (#6124)
aec34205e9 is described below

commit aec34205e94728b6975551933e99c1d874806a13
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Thu Apr 27 17:13:01 2023 +0300

    Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered (V2) (#6124)
    
    * add starting code for experimenting
    
    * stream group by linear implementation
    
    * sorted implementation
    
    * minor changes
    
    * simplifications
    
    * Simplifications
    
    * convert vec to Option
    
    * minor changes
    
    * minor changes
    
    * minor changes
    
    * simplifications
    
    * minor changes
    
    * all tests pass
    
    * refactor
    
    * simplifications
    
    * remove unnecessary code
    
    * simplifications
    
    * minor changes
    
    * simplifications
    
    * minor changes
    
    * Simplify the GroupByOrderMode type
    
    * Address reviews
    
    * separate fully ordered case and remaining cases
    
    * change test data type
    
    * address reviews
    
    * Convert to option
    
    * retract back to old API.
    
    * Code quality: stylistic changes
    
    * Separate bounded stream and hash stream
    
    * Update comments
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
 .../{row_hash.rs => bounded_aggregate_stream.rs}   | 518 ++++++++++++++-------
 .../core/src/physical_plan/aggregates/mod.rs       | 291 ++++++++++--
 .../core/src/physical_plan/aggregates/row_hash.rs  | 140 +-----
 .../core/src/physical_plan/aggregates/utils.rs     | 151 ++++++
 datafusion/core/src/physical_plan/analyze.rs       |   2 +-
 .../core/src/physical_plan/coalesce_batches.rs     |   2 +-
 .../core/src/physical_plan/coalesce_partitions.rs  |   2 +-
 datafusion/core/src/physical_plan/filter.rs        |   2 +-
 .../core/src/physical_plan/joins/cross_join.rs     |   2 +-
 .../core/src/physical_plan/joins/hash_join.rs      |   2 +-
 .../src/physical_plan/joins/symmetric_hash_join.rs |   5 +-
 datafusion/core/src/physical_plan/mod.rs           |   2 +-
 datafusion/core/src/physical_plan/projection.rs    |   2 +-
 .../core/src/physical_plan/repartition/mod.rs      |   2 +-
 datafusion/core/src/physical_plan/sorts/sort.rs    |   2 +-
 datafusion/core/src/physical_plan/union.rs         |   4 +-
 datafusion/core/src/physical_plan/unnest.rs        |   2 +-
 .../windows/bounded_window_agg_exec.rs             |   3 +-
 .../src/physical_plan/windows/window_agg_exec.rs   |   2 +-
 datafusion/core/tests/aggregate_fuzz.rs            | 222 +++++++++
 datafusion/core/tests/sql/group_by.rs              |  97 ++++
 .../core/tests/sqllogictests/test_files/window.slt |  33 +-
 datafusion/core/tests/window_fuzz.rs               |   4 +
 23 files changed, 1118 insertions(+), 374 deletions(-)

diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
similarity index 66%
copy from datafusion/core/src/physical_plan/aggregates/row_hash.rs
copy to datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
index bf1846ae98..127821de45 100644
--- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs
+++ b/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
@@ -15,7 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Hash aggregation through row format
+//! Hash aggregation on ordered group by expressions
+//! Output generated will itself have an ordering
+//! and executor can run with bounded memory (can generate result in streaming cases)
 
 use std::cmp::min;
 use std::ops::Range;
@@ -24,35 +26,38 @@ use std::task::{Context, Poll};
 use std::vec;
 
 use ahash::RandomState;
-use arrow::row::{OwnedRow, RowConverter, SortField};
-use datafusion_physical_expr::hash_utils::create_hashes;
 use futures::ready;
 use futures::stream::{Stream, StreamExt};
+use hashbrown::raw::RawTable;
+use itertools::izip;
 
 use crate::execution::context::TaskContext;
 use crate::execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
 use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
 use crate::physical_plan::aggregates::{
-    evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AccumulatorItem,
-    AggregateMode, PhysicalGroupBy, RowAccumulatorItem,
+    evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AggregateMode,
+    AggregationOrdering, GroupByOrderMode, PhysicalGroupBy, RowAccumulatorItem,
 };
 use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
 use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
 use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
-use arrow::array::*;
-use arrow::compute::{cast, filter};
-use arrow::datatypes::{DataType, Schema, UInt32Type};
-use arrow::{compute, datatypes::SchemaRef, record_batch::RecordBatch};
+
+use crate::physical_plan::aggregates::utils::{
+    aggr_state_schema, col_to_scalar, get_at_indices, get_optional_filters,
+    read_as_batch, slice_and_maybe_filter, ExecutionState, GroupState,
+};
+use arrow::array::{new_null_array, ArrayRef, UInt32Builder};
+use arrow::compute::{cast, SortColumn};
+use arrow::datatypes::DataType;
+use arrow::row::{OwnedRow, RowConverter, SortField};
+use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
 use datafusion_common::cast::as_boolean_array;
-use datafusion_common::utils::get_arrayref_at_indices;
+use datafusion_common::utils::{evaluate_partition_ranges, get_row_at_idx};
 use datafusion_common::{Result, ScalarValue};
 use datafusion_expr::Accumulator;
+use datafusion_physical_expr::hash_utils::create_hashes;
 use datafusion_row::accessor::RowAccessor;
 use datafusion_row::layout::RowLayout;
-use datafusion_row::reader::{read_row, RowReader};
-use datafusion_row::MutableRecordBatch;
-use hashbrown::raw::RawTable;
-use itertools::izip;
 
 /// Grouping aggregate with row-format aggregation states inside.
 ///
@@ -70,7 +75,7 @@ use itertools::izip;
 ///
 /// [Arrow-row]: OwnedRow
 /// [WordAligned]: datafusion_row::layout
-pub(crate) struct GroupedHashAggregateStream {
+pub(crate) struct BoundedAggregateStream {
     schema: SchemaRef,
     input: SendableRecordBatchStream,
     mode: AggregateMode,
@@ -105,26 +110,12 @@ pub(crate) struct GroupedHashAggregateStream {
     /// first element in the array corresponds to normal accumulators
     /// second element in the array corresponds to row accumulators
     indices: [Vec<Range<usize>>; 2],
+    aggregation_ordering: AggregationOrdering,
+    is_end: bool,
 }
 
-#[derive(Debug)]
-/// tracks what phase the aggregation is in
-enum ExecutionState {
-    ReadingInput,
-    ProducingOutput,
-    Done,
-}
-
-fn aggr_state_schema(aggr_expr: &[Arc<dyn AggregateExpr>]) -> Result<SchemaRef> {
-    let fields = aggr_expr
-        .iter()
-        .flat_map(|expr| expr.state_fields().unwrap().into_iter())
-        .collect::<Vec<_>>();
-    Ok(Arc::new(Schema::new(fields)))
-}
-
-impl GroupedHashAggregateStream {
-    /// Create a new GroupedHashAggregateStream
+impl BoundedAggregateStream {
+    /// Create a new BoundedAggregateStream
     #[allow(clippy::too_many_arguments)]
     pub fn new(
         mode: AggregateMode,
@@ -137,6 +128,8 @@ impl GroupedHashAggregateStream {
         batch_size: usize,
         context: Arc<TaskContext>,
         partition: usize,
+        // Stores algorithm mode and output ordering
+        aggregation_ordering: AggregationOrdering,
     ) -> Result<Self> {
         let timer = baseline_metrics.elapsed_compute().timer();
 
@@ -205,18 +198,18 @@ impl GroupedHashAggregateStream {
 
         let row_aggr_layout = Arc::new(RowLayout::new(&row_aggr_schema));
 
-        let name = format!("GroupedHashAggregateStream[{partition}]");
+        let name = format!("BoundedAggregateStream[{partition}]");
         let aggr_state = AggregationState {
             reservation: MemoryConsumer::new(name).register(context.memory_pool()),
             map: RawTable::with_capacity(0),
-            group_states: Vec::with_capacity(0),
+            ordered_group_states: Vec::with_capacity(0),
         };
 
         timer.done();
 
         let exec_state = ExecutionState::ReadingInput;
 
-        Ok(GroupedHashAggregateStream {
+        Ok(BoundedAggregateStream {
             schema: Arc::clone(&schema),
             input,
             mode,
@@ -237,11 +230,13 @@ impl GroupedHashAggregateStream {
             batch_size,
             row_group_skip_position: 0,
             indices: [normal_agg_indices, row_agg_indices],
+            is_end: false,
+            aggregation_ordering,
         })
     }
 }
 
-impl Stream for GroupedHashAggregateStream {
+impl Stream for BoundedAggregateStream {
     type Item = Result<RecordBatch>;
 
     fn poll_next(
@@ -275,6 +270,10 @@ impl Stream for GroupedHashAggregateStream {
                         Some(Err(e)) => return Poll::Ready(Some(Err(e))),
                         // inner is done, producing output
                         None => {
+                            for element in self.aggr_state.ordered_group_states.iter_mut()
+                            {
+                                element.status = GroupStatus::CanEmit;
+                            }
                             self.exec_state = ExecutionState::ProducingOutput;
                         }
                     }
@@ -285,12 +284,14 @@ impl Stream for GroupedHashAggregateStream {
                     let result = self.create_batch_from_map();
 
                     timer.done();
-                    self.row_group_skip_position += self.batch_size;
 
                     match result {
                         // made output
                         Ok(Some(result)) => {
                             let batch = result.record_output(&self.baseline_metrics);
+                            self.row_group_skip_position += batch.num_rows();
+                            self.exec_state = ExecutionState::ReadingInput;
+                            self.prune();
                             return Poll::Ready(Some(Ok(batch)));
                         }
                         // end of output
@@ -307,21 +308,141 @@ impl Stream for GroupedHashAggregateStream {
     }
 }
 
-impl RecordBatchStream for GroupedHashAggregateStream {
+impl RecordBatchStream for BoundedAggregateStream {
     fn schema(&self) -> SchemaRef {
         self.schema.clone()
     }
 }
 
-impl GroupedHashAggregateStream {
-    // Update the row_aggr_state according to groub_by values (result of group_by_expressions)
+/// This utility object encapsulates the row object, the hash and the group
+/// indices for a group. This information is used when executing streaming
+/// GROUP BY calculations.
+struct GroupOrderInfo {
+    owned_row: OwnedRow,
+    hash: u64,
+    range: Range<usize>,
+}
+
+impl BoundedAggregateStream {
+    // Update the aggr_state according to group_by values (result of group_by_expressions) when group by
+    // expressions are fully ordered.
+    fn update_ordered_group_state(
+        &mut self,
+        group_values: &[ArrayRef],
+        per_group_indices: Vec<GroupOrderInfo>,
+        allocated: &mut usize,
+    ) -> Result<Vec<usize>> {
+        // 1.1 construct the key from the group values
+        // 1.2 construct the mapping key if it does not exist
+        // 1.3 add the row' index to `indices`
+
+        // track which entries in `aggr_state` have rows in this batch to aggregate
+        let mut groups_with_rows = vec![];
+
+        let AggregationState {
+            map: row_map,
+            ordered_group_states: row_group_states,
+            ..
+        } = &mut self.aggr_state;
+
+        for GroupOrderInfo {
+            owned_row,
+            hash,
+            range,
+        } in per_group_indices
+        {
+            let entry = row_map.get_mut(hash, |(_hash, group_idx)| {
+                // verify that a group that we are inserting with hash is
+                // actually the same key value as the group in
+                // existing_idx  (aka group_values @ row)
+                let ordered_group_state = &row_group_states[*group_idx];
+                let group_state = &ordered_group_state.group_state;
+                owned_row.row() == group_state.group_by_values.row()
+            });
+
+            match entry {
+                // Existing entry for this group value
+                Some((_hash, group_idx)) => {
+                    let group_state = &mut row_group_states[*group_idx].group_state;
+
+                    // 1.3
+                    if group_state.indices.is_empty() {
+                        groups_with_rows.push(*group_idx);
+                    };
+                    for row in range.start..range.end {
+                        // remember this row
+                        group_state.indices.push_accounted(row as u32, allocated);
+                    }
+                }
+                //  1.2 Need to create new entry
+                None => {
+                    let accumulator_set =
+                        aggregates::create_accumulators(&self.normal_aggr_expr)?;
+                    let row = get_row_at_idx(group_values, range.start)?;
+                    let ordered_columns = self
+                        .aggregation_ordering
+                        .order_indices
+                        .iter()
+                        .map(|idx| row[*idx].clone())
+                        .collect::<Vec<_>>();
+                    // Add new entry to group_states and save newly created index
+                    let group_state = GroupState {
+                        group_by_values: owned_row,
+                        aggregation_buffer: vec![
+                            0;
+                            self.row_aggr_layout.fixed_part_width()
+                        ],
+                        accumulator_set,
+                        indices: (range.start as u32..range.end as u32)
+                            .collect::<Vec<_>>(), // 1.3
+                    };
+                    let group_idx = row_group_states.len();
+
+                    // NOTE: do NOT include the `RowGroupState` struct size in here because this is captured by
+                    // `group_states` (see allocation down below)
+                    *allocated += (std::mem::size_of::<u8>()
+                        * group_state.group_by_values.as_ref().len())
+                        + (std::mem::size_of::<u8>()
+                            * group_state.aggregation_buffer.capacity())
+                        + (std::mem::size_of::<u32>() * group_state.indices.capacity());
+
+                    // Allocation done by normal accumulators
+                    *allocated += (std::mem::size_of::<Box<dyn Accumulator>>()
+                        * group_state.accumulator_set.capacity())
+                        + group_state
+                            .accumulator_set
+                            .iter()
+                            .map(|accu| accu.size())
+                            .sum::<usize>();
+
+                    // for hasher function, use precomputed hash value
+                    row_map.insert_accounted(
+                        (hash, group_idx),
+                        |(hash, _group_index)| *hash,
+                        allocated,
+                    );
+
+                    let ordered_group_state = OrderedGroupState {
+                        group_state,
+                        ordered_columns,
+                        status: GroupStatus::GroupProgress,
+                        hash,
+                    };
+                    row_group_states.push_accounted(ordered_group_state, allocated);
+
+                    groups_with_rows.push(group_idx);
+                }
+            };
+        }
+        Ok(groups_with_rows)
+    }
+
+    // Update the aggr_state according to group_by values (result of group_by_expressions)
     fn update_group_state(
         &mut self,
         group_values: &[ArrayRef],
         allocated: &mut usize,
     ) -> Result<Vec<usize>> {
-        let group_rows = self.row_converter.convert_columns(group_values)?;
-        let n_rows = group_rows.num_rows();
         // 1.1 construct the key from the group values
         // 1.2 construct the mapping key if it does not exist
         // 1.3 add the row' index to `indices`
@@ -329,12 +450,16 @@ impl GroupedHashAggregateStream {
         // track which entries in `aggr_state` have rows in this batch to aggregate
         let mut groups_with_rows = vec![];
 
+        let group_rows = self.row_converter.convert_columns(group_values)?;
+        let n_rows = group_rows.num_rows();
         // 1.1 Calculate the group keys for the group values
         let mut batch_hashes = vec![0; n_rows];
         create_hashes(group_values, &self.random_state, &mut batch_hashes)?;
 
         let AggregationState {
-            map, group_states, ..
+            map,
+            ordered_group_states: group_states,
+            ..
         } = &mut self.aggr_state;
 
         for (row, hash) in batch_hashes.into_iter().enumerate() {
@@ -342,14 +467,14 @@ impl GroupedHashAggregateStream {
                 // verify that a group that we are inserting with hash is
                 // actually the same key value as the group in
                 // existing_idx  (aka group_values @ row)
-                let group_state = &group_states[*group_idx];
+                let group_state = &group_states[*group_idx].group_state;
                 group_rows.row(row) == group_state.group_by_values.row()
             });
 
             match entry {
                 // Existing entry for this group value
                 Some((_hash, group_idx)) => {
-                    let group_state = &mut group_states[*group_idx];
+                    let group_state = &mut group_states[*group_idx].group_state;
 
                     // 1.3
                     if group_state.indices.is_empty() {
@@ -362,7 +487,13 @@ impl GroupedHashAggregateStream {
                 None => {
                     let accumulator_set =
                         aggregates::create_accumulators(&self.normal_aggr_expr)?;
-                    // Add new entry to group_states and save newly created index
+                    let row_values = get_row_at_idx(group_values, row)?;
+                    let ordered_columns = self
+                        .aggregation_ordering
+                        .order_indices
+                        .iter()
+                        .map(|idx| row_values[*idx].clone())
+                        .collect::<Vec<_>>();
                     let group_state = GroupState {
                         group_by_values: group_rows.row(row).owned(),
                         aggregation_buffer: vec![
@@ -398,7 +529,14 @@ impl GroupedHashAggregateStream {
                         allocated,
                     );
 
-                    group_states.push_accounted(group_state, allocated);
+                    // Add new entry to group_states and save newly created index
+                    let ordered_group_state = OrderedGroupState {
+                        group_state,
+                        ordered_columns,
+                        status: GroupStatus::GroupProgress,
+                        hash,
+                    };
+                    group_states.push_accounted(ordered_group_state, allocated);
 
                     groups_with_rows.push(group_idx);
                 }
@@ -407,7 +545,7 @@ impl GroupedHashAggregateStream {
         Ok(groups_with_rows)
     }
 
-    // Update the accumulator results, according to row_aggr_state.
+    // Update the accumulator results, according to aggr_state.
     #[allow(clippy::too_many_arguments)]
     fn update_accumulators_using_batch(
         &mut self,
@@ -428,7 +566,8 @@ impl GroupedHashAggregateStream {
             .iter()
             .zip(offsets.windows(2))
             .try_for_each(|(group_idx, offsets)| {
-                let group_state = &mut self.aggr_state.group_states[*group_idx];
+                let group_state =
+                    &mut self.aggr_state.ordered_group_states[*group_idx].group_state;
                 // 2.2
                 // Process row accumulators
                 self.row_accumulators
@@ -490,7 +629,7 @@ impl GroupedHashAggregateStream {
         Ok(())
     }
 
-    // Update the accumulator results, according to row_aggr_state.
+    // Update the accumulator results, according to aggr_state.
     fn update_accumulators_using_scalar(
         &mut self,
         groups_with_rows: &[usize],
@@ -506,7 +645,8 @@ impl GroupedHashAggregateStream {
             .collect::<Result<Vec<_>>>()?;
 
         for group_idx in groups_with_rows {
-            let group_state = &mut self.aggr_state.group_states[*group_idx];
+            let group_state =
+                &mut self.aggr_state.ordered_group_states[*group_idx].group_state;
             let mut state_accessor =
                 RowAccessor::new_from_layout(self.row_aggr_layout.clone());
             state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice());
@@ -562,8 +702,48 @@ impl GroupedHashAggregateStream {
 
         let row_converter_size_pre = self.row_converter.size();
         for group_values in &group_by_values {
-            let groups_with_rows =
-                self.update_group_state(group_values, &mut allocated)?;
+            let groups_with_rows = if let AggregationOrdering {
+                mode: GroupByOrderMode::Ordered,
+                order_indices,
+                ordering,
+            } = &self.aggregation_ordering
+            {
+                let group_rows = self.row_converter.convert_columns(group_values)?;
+                let n_rows = group_rows.num_rows();
+                // 1.1 Calculate the group keys for the group values
+                let mut batch_hashes = vec![0; n_rows];
+                create_hashes(group_values, &self.random_state, &mut batch_hashes)?;
+                let sort_column = order_indices
+                    .iter()
+                    .enumerate()
+                    .map(|(idx, cur_idx)| SortColumn {
+                        values: group_values[*cur_idx].clone(),
+                        options: Some(ordering[idx].options),
+                    })
+                    .collect::<Vec<_>>();
+                let n_rows = group_rows.num_rows();
+                let ranges = evaluate_partition_ranges(n_rows, &sort_column)?;
+                let per_group_indices = ranges
+                    .into_iter()
+                    .map(|range| {
+                        let row = group_rows.row(range.start).owned();
+                        // (row, batch_hashes[range.start], range)
+                        GroupOrderInfo {
+                            owned_row: row,
+                            hash: batch_hashes[range.start],
+                            range,
+                        }
+                    })
+                    .collect::<Vec<_>>();
+                self.update_ordered_group_state(
+                    group_values,
+                    per_group_indices,
+                    &mut allocated,
+                )?
+            } else {
+                self.update_group_state(group_values, &mut allocated)?
+            };
+
             // Decide the accumulators update mode, use scalar value to update the accumulators when all of the conditions are meet:
             // 1) The aggregation mode is Partial or Single
             // 2) There is not normal aggregation expressions
@@ -580,58 +760,97 @@ impl GroupedHashAggregateStream {
                 )?;
             } else {
                 // Collect all indices + offsets based on keys in this vec
-                let mut batch_indices: UInt32Builder = UInt32Builder::with_capacity(0);
+                let mut batch_indices = UInt32Builder::with_capacity(0);
                 let mut offsets = vec![0];
                 let mut offset_so_far = 0;
                 for &group_idx in groups_with_rows.iter() {
-                    let indices = &self.aggr_state.group_states[group_idx].indices;
+                    let indices = &self.aggr_state.ordered_group_states[group_idx]
+                        .group_state
+                        .indices;
                     batch_indices.append_slice(indices);
                     offset_so_far += indices.len();
                     offsets.push(offset_so_far);
                 }
                 let batch_indices = batch_indices.finish();
 
-                let row_values = get_at_indices(&row_aggr_input_values, &batch_indices)?;
-                let normal_values =
-                    get_at_indices(&normal_aggr_input_values, &batch_indices)?;
                 let row_filter_values =
                     get_optional_filters(&row_filter_values, &batch_indices);
                 let normal_filter_values =
                     get_optional_filters(&normal_filter_values, &batch_indices);
-                self.update_accumulators_using_batch(
-                    &groups_with_rows,
-                    &offsets,
-                    &row_values,
-                    &normal_values,
-                    &row_filter_values,
-                    &normal_filter_values,
-                    &mut allocated,
-                )?;
+                if self.aggregation_ordering.mode == GroupByOrderMode::Ordered {
+                    self.update_accumulators_using_batch(
+                        &groups_with_rows,
+                        &offsets,
+                        &row_aggr_input_values,
+                        &normal_aggr_input_values,
+                        &row_filter_values,
+                        &normal_filter_values,
+                        &mut allocated,
+                    )?;
+                } else {
+                    let row_values =
+                        get_at_indices(&row_aggr_input_values, &batch_indices)?;
+                    let normal_values =
+                        get_at_indices(&normal_aggr_input_values, &batch_indices)?;
+                    self.update_accumulators_using_batch(
+                        &groups_with_rows,
+                        &offsets,
+                        &row_values,
+                        &normal_values,
+                        &row_filter_values,
+                        &normal_filter_values,
+                        &mut allocated,
+                    )?;
+                };
             }
         }
         allocated += self
             .row_converter
             .size()
             .saturating_sub(row_converter_size_pre);
+
+        let mut new_result = false;
+        let last_ordered_columns = self
+            .aggr_state
+            .ordered_group_states
+            .last()
+            .map(|item| item.ordered_columns.clone());
+
+        if let Some(last_ordered_columns) = last_ordered_columns {
+            for cur_group in &mut self.aggr_state.ordered_group_states {
+                if cur_group.ordered_columns != last_ordered_columns {
+                    // We will no longer receive value. Set status to GroupStatus::CanEmit
+                    // meaning we can generate result for this group.
+                    cur_group.status = GroupStatus::CanEmit;
+                    new_result = true;
+                }
+            }
+        }
+        if new_result {
+            self.exec_state = ExecutionState::ProducingOutput;
+        }
+
         Ok(allocated)
     }
 }
 
+#[derive(Debug, PartialEq)]
+enum GroupStatus {
+    // `GroupProgress` means data for current group is not complete. New data may arrive.
+    GroupProgress,
+    // `CanEmit` means data for current group is completed. And its result can emitted.
+    CanEmit,
+    // Emitted means that result for the groups is outputted. Group can be pruned from state.
+    Emitted,
+}
+
 /// The state that is built for each output group.
 #[derive(Debug)]
-pub struct GroupState {
-    /// The actual group by values, stored sequentially
-    group_by_values: OwnedRow,
-
-    // Accumulator state, stored sequentially
-    pub aggregation_buffer: Vec<u8>,
-
-    // Accumulator state, one for each aggregate that doesn't support row accumulation
-    pub accumulator_set: Vec<AccumulatorItem>,
-
-    /// scratch space used to collect indices for input rows in a
-    /// bach that have values to aggregate. Reset on each batch
-    pub indices: Vec<u32>,
+pub struct OrderedGroupState {
+    group_state: GroupState,
+    ordered_columns: Vec<ScalarValue>,
+    status: GroupStatus,
+    hash: u64,
 }
 
 /// The state of all the groups
@@ -648,7 +867,7 @@ pub struct AggregationState {
     pub map: RawTable<(u64, usize)>,
 
     /// State for each group
-    pub group_states: Vec<GroupState>,
+    pub ordered_group_states: Vec<OrderedGroupState>,
 }
 
 impl std::fmt::Debug for AggregationState {
@@ -657,28 +876,54 @@ impl std::fmt::Debug for AggregationState {
         let map_string = "RawTable";
         f.debug_struct("AggregationState")
             .field("map", &map_string)
-            .field("group_states", &self.group_states)
+            .field("ordered_group_states", &self.ordered_group_states)
             .finish()
     }
 }
 
-impl GroupedHashAggregateStream {
+impl BoundedAggregateStream {
+    /// Prune the groups from the `self.aggr_state.group_states` which are in
+    /// `GroupStatus::Emitted`(this status means that result of this group emitted/outputted already, and
+    /// we are sure that these groups cannot receive new rows.) status.
+    fn prune(&mut self) {
+        let n_partition = self.aggr_state.ordered_group_states.len();
+        self.aggr_state
+            .ordered_group_states
+            .retain(|elem| elem.status != GroupStatus::Emitted);
+        let n_partition_new = self.aggr_state.ordered_group_states.len();
+        let n_pruned = n_partition - n_partition_new;
+        self.aggr_state.map.clear();
+        for (idx, item) in self.aggr_state.ordered_group_states.iter().enumerate() {
+            self.aggr_state
+                .map
+                .insert(item.hash, (item.hash, idx), |(hash, _)| *hash);
+        }
+        self.row_group_skip_position -= n_pruned;
+    }
+
     /// Create a RecordBatch with all group keys and accumulator' states or values.
     fn create_batch_from_map(&mut self) -> Result<Option<RecordBatch>> {
         let skip_items = self.row_group_skip_position;
-        if skip_items > self.aggr_state.group_states.len() {
+        if skip_items > self.aggr_state.ordered_group_states.len() || self.is_end {
             return Ok(None);
         }
-        if self.aggr_state.group_states.is_empty() {
+        self.is_end |= skip_items == self.aggr_state.ordered_group_states.len();
+        if self.aggr_state.ordered_group_states.is_empty() {
             let schema = self.schema.clone();
             return Ok(Some(RecordBatch::new_empty(schema)));
         }
 
         let end_idx = min(
             skip_items + self.batch_size,
-            self.aggr_state.group_states.len(),
+            self.aggr_state.ordered_group_states.len(),
         );
-        let group_state_chunk = &self.aggr_state.group_states[skip_items..end_idx];
+        let group_state_chunk =
+            &self.aggr_state.ordered_group_states[skip_items..end_idx];
+        // Consider only the groups that can be emitted. (The ones we are sure that will not receive new entry.)
+        let group_state_chunk = group_state_chunk
+            .iter()
+            .filter(|item| item.status == GroupStatus::CanEmit)
+            .collect::<Vec<_>>();
 
         if group_state_chunk.is_empty() {
             let schema = self.schema.clone();
@@ -688,7 +933,7 @@ impl GroupedHashAggregateStream {
         // Buffers for each distinct group (i.e. row accumulator memories)
         let mut state_buffers = group_state_chunk
             .iter()
-            .map(|gs| gs.aggregation_buffer.clone())
+            .map(|gs| gs.group_state.aggregation_buffer.clone())
             .collect::<Vec<_>>();
 
         let output_fields = self.schema.fields();
@@ -734,7 +979,7 @@ impl GroupedHashAggregateStream {
                 let current = match self.mode {
                     AggregateMode::Partial => ScalarValue::iter_to_array(
                         group_state_chunk.iter().map(|group_state| {
-                            group_state.accumulator_set[idx]
+                            group_state.group_state.accumulator_set[idx]
                                 .state()
                                 .map(|v| v[field_idx].clone())
                                 .expect("Unexpected accumulator state in hash aggregate")
@@ -744,7 +989,7 @@ impl GroupedHashAggregateStream {
                     | AggregateMode::FinalPartitioned
                     | AggregateMode::Single => ScalarValue::iter_to_array(
                         group_state_chunk.iter().map(|group_state| {
-                            group_state.accumulator_set[idx]
+                            group_state.group_state.accumulator_set[idx]
                                 .evaluate()
                                 .expect("Unexpected accumulator state in hash aggregate")
                         }),
@@ -761,7 +1006,7 @@ impl GroupedHashAggregateStream {
         // Stores the group by fields
         let group_buffers = group_state_chunk
             .iter()
-            .map(|gs| gs.group_by_values.row())
+            .map(|gs| gs.group_state.group_by_values.row())
             .collect::<Vec<_>>();
         let mut output: Vec<ArrayRef> = self.row_converter.convert_rows(group_buffers)?;
 
@@ -785,91 +1030,14 @@ impl GroupedHashAggregateStream {
                 }
             }
         }
-        Ok(Some(RecordBatch::try_new(self.schema.clone(), output)?))
-    }
-}
-
-fn read_as_batch(rows: &[Vec<u8>], schema: &Schema) -> Vec<ArrayRef> {
-    let row_num = rows.len();
-    let mut output = MutableRecordBatch::new(row_num, Arc::new(schema.clone()));
-    let mut row = RowReader::new(schema);
-
-    for data in rows {
-        row.point_to(0, data);
-        read_row(&row, &mut output, schema);
-    }
-
-    output.output_as_columns()
-}
-
-fn get_at_indices(
-    input_values: &[Vec<ArrayRef>],
-    batch_indices: &PrimitiveArray<UInt32Type>,
-) -> Result<Vec<Vec<ArrayRef>>> {
-    input_values
-        .iter()
-        .map(|array| get_arrayref_at_indices(array, batch_indices))
-        .collect()
-}
-
-fn get_optional_filters(
-    original_values: &[Option<Arc<dyn Array>>],
-    batch_indices: &PrimitiveArray<UInt32Type>,
-) -> Vec<Option<Arc<dyn Array>>> {
-    original_values
-        .iter()
-        .map(|array| {
-            array.as_ref().map(|array| {
-                compute::take(
-                    array.as_ref(),
-                    batch_indices,
-                    None, // None: no index check
-                )
-                .unwrap()
-            })
-        })
-        .collect()
-}
 
-fn slice_and_maybe_filter(
-    aggr_array: &[ArrayRef],
-    filter_opt: Option<&Arc<dyn Array>>,
-    offsets: &[usize],
-) -> Result<Vec<ArrayRef>> {
-    let sliced_arrays: Vec<ArrayRef> = aggr_array
-        .iter()
-        .map(|array| array.slice(offsets[0], offsets[1] - offsets[0]))
-        .collect();
-
-    let filtered_arrays = match filter_opt.as_ref() {
-        Some(f) => {
-            let sliced = f.slice(offsets[0], offsets[1] - offsets[0]);
-            let filter_array = as_boolean_array(&sliced)?;
-
-            sliced_arrays
-                .iter()
-                .map(|array| filter(array, filter_array).unwrap())
-                .collect::<Vec<ArrayRef>>()
+        // Set status of the emitted groups to GroupStatus::Emitted mode.
+        for gs in self.aggr_state.ordered_group_states[skip_items..end_idx].iter_mut() {
+            if gs.status == GroupStatus::CanEmit {
+                gs.status = GroupStatus::Emitted;
+            }
         }
-        None => sliced_arrays,
-    };
-    Ok(filtered_arrays)
-}
 
-/// This method is similar to Scalar::try_from_array except for the Null handling.
-/// This method returns [ScalarValue::Null] instead of [ScalarValue::Type(None)]
-fn col_to_scalar(
-    array: &ArrayRef,
-    filter: &Option<&BooleanArray>,
-    row_index: usize,
-) -> Result<ScalarValue> {
-    if array.is_null(row_index) {
-        return Ok(ScalarValue::Null);
-    }
-    if let Some(filter) = filter {
-        if !filter.value(row_index) {
-            return Ok(ScalarValue::Null);
-        }
+        Ok(Some(RecordBatch::try_new(self.schema.clone(), output)?))
     }
-    ScalarValue::try_from_array(array, row_index)
 }
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs
index 57525b387f..055f60346d 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -18,39 +18,43 @@
 //! Aggregates functionalities
 
 use crate::execution::context::TaskContext;
-use crate::physical_plan::aggregates::no_grouping::AggregateStream;
+use crate::physical_plan::aggregates::{
+    bounded_aggregate_stream::BoundedAggregateStream, no_grouping::AggregateStream,
+    row_hash::GroupedHashAggregateStream,
+};
 use crate::physical_plan::metrics::{
     BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
 };
 use crate::physical_plan::{
-    DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning,
     SendableRecordBatchStream, Statistics,
 };
 use arrow::array::ArrayRef;
+use arrow::compute::DEFAULT_CAST_OPTIONS;
 use arrow::datatypes::{Field, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
+use datafusion_common::utils::longest_consecutive_prefix;
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::Accumulator;
-use datafusion_physical_expr::expressions::{Avg, CastExpr, Column, Sum};
 use datafusion_physical_expr::{
-    expressions, AggregateExpr, PhysicalExpr, PhysicalSortExpr,
+    aggregate::row_accumulator::RowAccumulator,
+    equivalence::project_equivalence_properties,
+    expressions::{Avg, CastExpr, Column, Sum},
+    normalize_out_expr_with_columns_map,
+    utils::{convert_to_expr, get_indices_of_matching_exprs},
+    AggregateExpr, PhysicalExpr, PhysicalSortExpr,
 };
 use std::any::Any;
 use std::collections::HashMap;
-
-use arrow::compute::DEFAULT_CAST_OPTIONS;
 use std::sync::Arc;
 
+mod bounded_aggregate_stream;
 mod no_grouping;
 mod row_hash;
+mod utils;
 
-use crate::physical_plan::aggregates::row_hash::GroupedHashAggregateStream;
-use crate::physical_plan::EquivalenceProperties;
 pub use datafusion_expr::AggregateFunction;
-use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulator;
-use datafusion_physical_expr::equivalence::project_equivalence_properties;
 pub use datafusion_physical_expr::expressions::create_aggregate_expr;
-use datafusion_physical_expr::normalize_out_expr_with_columns_map;
 
 /// Hash aggregate modes
 #[derive(Debug, Copy, Clone, PartialEq, Eq)]
@@ -72,6 +76,21 @@ pub enum AggregateMode {
     Single,
 }
 
+/// Group By expression modes
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum GroupByOrderMode {
+    /// Some of the expressions in the GROUP BY clause have an ordering.
+    // For example, if the input is ordered by a, b, c, d and we group by b, a, d;
+    // the mode will be `PartiallyOrdered` meaning a subset of group b, a, d
+    // defines a preset for the existing ordering, e.g a, b defines a preset.
+    PartiallyOrdered,
+    /// All the expressions in the GROUP BY clause have orderings.
+    // For example, if the input is ordered by a, b, c, d and we group by b, a;
+    // the mode will be `Ordered` meaning a all of the of group b, d
+    // defines a preset for the existing ordering, e.g a, b defines a preset.
+    Ordered,
+}
+
 /// Represents `GROUP BY` clause in the plan (including the more general GROUPING SET)
 /// In the case of a simple `GROUP BY a, b` clause, this will contain the expression [a, b]
 /// and a single group [false, false].
@@ -173,6 +192,7 @@ impl PartialEq for PhysicalGroupBy {
 enum StreamType {
     AggregateStream(AggregateStream),
     GroupedHashAggregateStream(GroupedHashAggregateStream),
+    BoundedAggregate(BoundedAggregateStream),
 }
 
 impl From<StreamType> for SendableRecordBatchStream {
@@ -180,10 +200,20 @@ impl From<StreamType> for SendableRecordBatchStream {
         match stream {
             StreamType::AggregateStream(stream) => Box::pin(stream),
             StreamType::GroupedHashAggregateStream(stream) => Box::pin(stream),
+            StreamType::BoundedAggregate(stream) => Box::pin(stream),
         }
     }
 }
 
+#[derive(Debug, Clone)]
+pub(crate) struct AggregationOrdering {
+    mode: GroupByOrderMode,
+    /// Stores indices such that when we iterate with these indices, GROUP BY
+    /// expressions match input ordering.
+    order_indices: Vec<usize>,
+    ordering: Vec<PhysicalSortExpr>,
+}
+
 /// Hash aggregate execution plan
 #[derive(Debug)]
 pub struct AggregateExec {
@@ -208,6 +238,99 @@ pub struct AggregateExec {
     columns_map: HashMap<Column, Vec<Column>>,
     /// Execution Metrics
     metrics: ExecutionPlanMetricsSet,
+    /// Stores mode, and output ordering information for the `AggregateExec`.
+    aggregation_ordering: Option<AggregationOrdering>,
+}
+
+/// Calculates the working mode for `GROUP BY` queries.
+/// - If no GROUP BY expression has an ordering, returns `None`.
+/// - If some GROUP BY expressions have an ordering, returns `Some(GroupByOrderMode::PartiallyOrdered)`.
+/// - If all GROUP BY expressions have orderings, returns `Some(GroupByOrderMode::Ordered)`.
+fn get_working_mode(
+    input: &Arc<dyn ExecutionPlan>,
+    group_by: &PhysicalGroupBy,
+) -> Option<(GroupByOrderMode, Vec<usize>)> {
+    if group_by.groups.len() > 1 {
+        // We do not currently support streaming execution if we have more
+        // than one group (e.g. we have grouping sets).
+        return None;
+    };
+
+    let output_ordering = input.output_ordering().unwrap_or(&[]);
+    // Since direction of the ordering is not important for GROUP BY columns,
+    // we convert PhysicalSortExpr to PhysicalExpr in the existing ordering.
+    let ordering_exprs = convert_to_expr(output_ordering);
+    let groupby_exprs = group_by
+        .expr
+        .iter()
+        .map(|(item, _)| item.clone())
+        .collect::<Vec<_>>();
+    // Find where each expression of the GROUP BY clause occurs in the existing
+    // ordering (if it occurs):
+    let mut ordered_indices =
+        get_indices_of_matching_exprs(&groupby_exprs, &ordering_exprs, || {
+            input.equivalence_properties()
+        });
+    ordered_indices.sort();
+    // Find out how many expressions of the existing ordering define ordering
+    // for expressions in the GROUP BY clause. For example, if the input is
+    // ordered by a, b, c, d and we group by b, a, d; the result below would be.
+    // 2, meaning 2 elements (a, b) among the GROUP BY columns define ordering.
+    let first_n = longest_consecutive_prefix(ordered_indices);
+    if first_n == 0 {
+        // No GROUP by columns are ordered, we can not do streaming execution.
+        return None;
+    }
+    let ordered_exprs = ordering_exprs[0..first_n].to_vec();
+    // Find indices for the GROUP BY expressions such that when we iterate with
+    // these indices, we would match existing ordering. For the example above,
+    // this would produce 1, 0; meaning 1st and 0th entries (a, b) among the
+    // GROUP BY expressions b, a, d match input ordering.
+    let ordered_group_by_indices =
+        get_indices_of_matching_exprs(&ordered_exprs, &groupby_exprs, || {
+            input.equivalence_properties()
+        });
+    Some(if first_n == group_by.expr.len() {
+        (GroupByOrderMode::Ordered, ordered_group_by_indices)
+    } else {
+        (GroupByOrderMode::PartiallyOrdered, ordered_group_by_indices)
+    })
+}
+
+fn calc_aggregation_ordering(
+    input: &Arc<dyn ExecutionPlan>,
+    group_by: &PhysicalGroupBy,
+) -> Option<AggregationOrdering> {
+    get_working_mode(input, group_by).map(|(mode, order_indices)| {
+        let existing_ordering = input.output_ordering().unwrap_or(&[]);
+        let out_group_expr = output_group_expr_helper(group_by);
+        // Calculate output ordering information for the operator:
+        let out_ordering = order_indices
+            .iter()
+            .zip(existing_ordering)
+            .map(|(idx, input_col)| PhysicalSortExpr {
+                expr: out_group_expr[*idx].clone(),
+                options: input_col.options,
+            })
+            .collect::<Vec<_>>();
+        AggregationOrdering {
+            mode,
+            order_indices,
+            ordering: out_ordering,
+        }
+    })
+}
+
+/// Grouping expressions as they occur in the output schema
+fn output_group_expr_helper(group_by: &PhysicalGroupBy) -> Vec<Arc<dyn PhysicalExpr>> {
+    // Update column indices. Since the group by columns come first in the output schema, their
+    // indices are simply 0..self.group_expr(len).
+    group_by
+        .expr()
+        .iter()
+        .enumerate()
+        .map(|(index, (_, name))| Arc::new(Column::new(name, index)) as _)
+        .collect()
 }
 
 impl AggregateExec {
@@ -240,6 +363,8 @@ impl AggregateExec {
             };
         }
 
+        let aggregation_ordering = calc_aggregation_ordering(&input, &group_by);
+
         Ok(AggregateExec {
             mode,
             group_by,
@@ -250,6 +375,7 @@ impl AggregateExec {
             input_schema,
             columns_map,
             metrics: ExecutionPlanMetricsSet::new(),
+            aggregation_ordering,
         })
     }
 
@@ -265,16 +391,7 @@ impl AggregateExec {
 
     /// Grouping expressions as they occur in the output schema
     pub fn output_group_expr(&self) -> Vec<Arc<dyn PhysicalExpr>> {
-        // Update column indices. Since the group by columns come first in the output schema, their
-        // indices are simply 0..self.group_expr(len).
-        self.group_by
-            .expr()
-            .iter()
-            .enumerate()
-            .map(|(index, (_col, name))| {
-                Arc::new(expressions::Column::new(name, index)) as Arc<dyn PhysicalExpr>
-            })
-            .collect()
+        output_group_expr_helper(&self.group_by)
     }
 
     /// Aggregate expressions
@@ -305,6 +422,7 @@ impl AggregateExec {
         let batch_size = context.session_config().batch_size();
         let input = self.input.execute(partition, Arc::clone(&context))?;
         let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
+
         if self.group_by.expr.is_empty() {
             Ok(StreamType::AggregateStream(AggregateStream::new(
                 self.mode,
@@ -316,6 +434,20 @@ impl AggregateExec {
                 context,
                 partition,
             )?))
+        } else if let Some(aggregation_ordering) = &self.aggregation_ordering {
+            Ok(StreamType::BoundedAggregate(BoundedAggregateStream::new(
+                self.mode,
+                self.schema.clone(),
+                self.group_by.clone(),
+                self.aggr_expr.clone(),
+                self.filter_expr.clone(),
+                input,
+                baseline_metrics,
+                batch_size,
+                context,
+                partition,
+                aggregation_ordering.clone(),
+            )?))
         } else {
             Ok(StreamType::GroupedHashAggregateStream(
                 GroupedHashAggregateStream::new(
@@ -373,20 +505,27 @@ impl ExecutionPlan for AggregateExec {
     }
 
     /// Specifies whether this plan generates an infinite stream of records.
-    /// If the plan does not support pipelining, but it its input(s) are
+    /// If the plan does not support pipelining, but its input(s) are
     /// infinite, returns an error to indicate this.    
     fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
         if children[0] {
-            Err(DataFusionError::Plan(
-                "Aggregate Error: `GROUP BY` clause (including the more general GROUPING SET) is not supported for unbounded inputs.".to_string(),
-            ))
+            if self.aggregation_ordering.is_none() {
+                // Cannot run without breaking pipeline.
+                Err(DataFusionError::Plan(
+                    "Aggregate Error: `GROUP BY` clauses with columns without ordering and GROUPING SETS are not supported for unbounded inputs.".to_string(),
+                ))
+            } else {
+                Ok(true)
+            }
         } else {
             Ok(false)
         }
     }
 
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        None
+        self.aggregation_ordering
+            .as_ref()
+            .map(|item: &AggregationOrdering| item.ordering.as_slice())
     }
 
     fn required_input_distribution(&self) -> Vec<Distribution> {
@@ -801,13 +940,14 @@ mod tests {
     use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
     use crate::from_slice::FromSlice;
     use crate::physical_plan::aggregates::{
-        AggregateExec, AggregateMode, PhysicalGroupBy,
+        get_working_mode, AggregateExec, AggregateMode, PhysicalGroupBy,
     };
     use crate::physical_plan::expressions::{col, Avg};
-    use crate::test::assert_is_pending;
     use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
+    use crate::test::{assert_is_pending, csv_exec_sorted};
     use crate::{assert_batches_sorted_eq, physical_plan::common};
     use arrow::array::{Float64Array, UInt32Array};
+    use arrow::compute::{concat_batches, SortOptions};
     use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
     use arrow::record_batch::RecordBatch;
     use datafusion_common::{DataFusionError, Result, ScalarValue};
@@ -819,6 +959,7 @@ mod tests {
     use std::task::{Context, Poll};
 
     use super::StreamType;
+    use crate::physical_plan::aggregates::GroupByOrderMode::{Ordered, PartiallyOrdered};
     use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
     use crate::physical_plan::{
         ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
@@ -826,6 +967,92 @@ mod tests {
     };
     use crate::prelude::SessionContext;
 
+    // Generate a schema which consists of 5 columns (a, b, c, d, e)
+    fn create_test_schema() -> Result<SchemaRef> {
+        let a = Field::new("a", DataType::Int32, true);
+        let b = Field::new("b", DataType::Int32, true);
+        let c = Field::new("c", DataType::Int32, true);
+        let d = Field::new("d", DataType::Int32, true);
+        let e = Field::new("e", DataType::Int32, true);
+        let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
+
+        Ok(schema)
+    }
+
+    /// make PhysicalSortExpr with default options
+    fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
+        sort_expr_options(name, schema, SortOptions::default())
+    }
+
+    /// PhysicalSortExpr with specified options
+    fn sort_expr_options(
+        name: &str,
+        schema: &Schema,
+        options: SortOptions,
+    ) -> PhysicalSortExpr {
+        PhysicalSortExpr {
+            expr: col(name, schema).unwrap(),
+            options,
+        }
+    }
+
+    #[tokio::test]
+    async fn test_get_working_mode() -> Result<()> {
+        let test_schema = create_test_schema()?;
+        // Source is sorted by a ASC NULLS FIRST, b ASC NULLS FIRST, c ASC NULLS FIRST
+        // Column d, e is not ordered.
+        let sort_exprs = vec![
+            sort_expr("a", &test_schema),
+            sort_expr("b", &test_schema),
+            sort_expr("c", &test_schema),
+        ];
+        let input = csv_exec_sorted(&test_schema, sort_exprs, true);
+
+        // test cases consists of vector of tuples. Where each tuple represents a single test case.
+        // First field in the tuple is Vec<str> where each element in the vector represents GROUP BY columns
+        // For instance `vec!["a", "b"]` corresponds to GROUP BY a, b
+        // Second field in the tuple is Option<GroupByOrderMode>, which corresponds to expected algorithm mode.
+        // None represents that existing ordering is not sufficient to run executor with any one of the algorithms
+        // (We need to add SortExec to be able to run it).
+        // Some(GroupByOrderMode) represents, we can run algorithm with existing ordering; and algorithm should work in
+        // GroupByOrderMode.
+        let test_cases = vec![
+            (vec!["a"], Some((Ordered, vec![0]))),
+            (vec!["b"], None),
+            (vec!["c"], None),
+            (vec!["b", "a"], Some((Ordered, vec![1, 0]))),
+            (vec!["c", "b"], None),
+            (vec!["c", "a"], Some((PartiallyOrdered, vec![1]))),
+            (vec!["c", "b", "a"], Some((Ordered, vec![2, 1, 0]))),
+            (vec!["d", "a"], Some((PartiallyOrdered, vec![1]))),
+            (vec!["d", "b"], None),
+            (vec!["d", "c"], None),
+            (vec!["d", "b", "a"], Some((PartiallyOrdered, vec![2, 1]))),
+            (vec!["d", "c", "b"], None),
+            (vec!["d", "c", "a"], Some((PartiallyOrdered, vec![2]))),
+            (
+                vec!["d", "c", "b", "a"],
+                Some((PartiallyOrdered, vec![3, 2, 1])),
+            ),
+        ];
+        for (case_idx, test_case) in test_cases.iter().enumerate() {
+            let (group_by_columns, expected) = &test_case;
+            let mut group_by_exprs = vec![];
+            for col_name in group_by_columns {
+                group_by_exprs.push((col(col_name, &test_schema)?, col_name.to_string()));
+            }
+            let group_bys = PhysicalGroupBy::new_single(group_by_exprs);
+            let res = get_working_mode(&input, &group_bys);
+            assert_eq!(
+                res, *expected,
+                "Unexpected result for in unbounded test case#: {:?}, case: {:?}",
+                case_idx, test_case
+            );
+        }
+
+        Ok(())
+    }
+
     /// some mock data to aggregates
     fn some_data() -> (Arc<Schema>, Vec<RecordBatch>) {
         // define a schema.
@@ -940,9 +1167,7 @@ mod tests {
 
         let result =
             common::collect(merged_aggregate.execute(0, task_ctx.clone())?).await?;
-        assert_eq!(result.len(), 1);
-
-        let batch = &result[0];
+        let batch = concat_batches(&result[0].schema(), &result)?;
         assert_eq!(batch.num_columns(), 3);
         assert_eq!(batch.num_rows(), 12);
 
@@ -1037,9 +1262,7 @@ mod tests {
 
         let result =
             common::collect(merged_aggregate.execute(0, task_ctx.clone())?).await?;
-        assert_eq!(result.len(), 1);
-
-        let batch = &result[0];
+        let batch = concat_batches(&result[0].schema(), &result)?;
         assert_eq!(batch.num_columns(), 2);
         assert_eq!(batch.num_rows(), 3);
 
diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
index bf1846ae98..7b851aa1da 100644
--- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs
+++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
@@ -24,7 +24,7 @@ use std::task::{Context, Poll};
 use std::vec;
 
 use ahash::RandomState;
-use arrow::row::{OwnedRow, RowConverter, SortField};
+use arrow::row::{RowConverter, SortField};
 use datafusion_physical_expr::hash_utils::create_hashes;
 use futures::ready;
 use futures::stream::{Stream, StreamExt};
@@ -32,25 +32,26 @@ use futures::stream::{Stream, StreamExt};
 use crate::execution::context::TaskContext;
 use crate::execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
 use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
+use crate::physical_plan::aggregates::utils::{
+    aggr_state_schema, col_to_scalar, get_at_indices, get_optional_filters,
+    read_as_batch, slice_and_maybe_filter, ExecutionState, GroupState,
+};
 use crate::physical_plan::aggregates::{
-    evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AccumulatorItem,
-    AggregateMode, PhysicalGroupBy, RowAccumulatorItem,
+    evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AggregateMode,
+    PhysicalGroupBy, RowAccumulatorItem,
 };
 use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
 use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
 use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
 use arrow::array::*;
-use arrow::compute::{cast, filter};
-use arrow::datatypes::{DataType, Schema, UInt32Type};
-use arrow::{compute, datatypes::SchemaRef, record_batch::RecordBatch};
+use arrow::compute::cast;
+use arrow::datatypes::DataType;
+use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
 use datafusion_common::cast::as_boolean_array;
-use datafusion_common::utils::get_arrayref_at_indices;
 use datafusion_common::{Result, ScalarValue};
 use datafusion_expr::Accumulator;
 use datafusion_row::accessor::RowAccessor;
 use datafusion_row::layout::RowLayout;
-use datafusion_row::reader::{read_row, RowReader};
-use datafusion_row::MutableRecordBatch;
 use hashbrown::raw::RawTable;
 use itertools::izip;
 
@@ -68,7 +69,6 @@ use itertools::izip;
 /// 4. The state's RecordBatch is `merge`d to a new state
 /// 5. The state is mapped to the final value
 ///
-/// [Arrow-row]: OwnedRow
 /// [WordAligned]: datafusion_row::layout
 pub(crate) struct GroupedHashAggregateStream {
     schema: SchemaRef,
@@ -107,22 +107,6 @@ pub(crate) struct GroupedHashAggregateStream {
     indices: [Vec<Range<usize>>; 2],
 }
 
-#[derive(Debug)]
-/// tracks what phase the aggregation is in
-enum ExecutionState {
-    ReadingInput,
-    ProducingOutput,
-    Done,
-}
-
-fn aggr_state_schema(aggr_expr: &[Arc<dyn AggregateExpr>]) -> Result<SchemaRef> {
-    let fields = aggr_expr
-        .iter()
-        .flat_map(|expr| expr.state_fields().unwrap().into_iter())
-        .collect::<Vec<_>>();
-    Ok(Arc::new(Schema::new(fields)))
-}
-
 impl GroupedHashAggregateStream {
     /// Create a new GroupedHashAggregateStream
     #[allow(clippy::too_many_arguments)]
@@ -617,25 +601,8 @@ impl GroupedHashAggregateStream {
     }
 }
 
-/// The state that is built for each output group.
-#[derive(Debug)]
-pub struct GroupState {
-    /// The actual group by values, stored sequentially
-    group_by_values: OwnedRow,
-
-    // Accumulator state, stored sequentially
-    pub aggregation_buffer: Vec<u8>,
-
-    // Accumulator state, one for each aggregate that doesn't support row accumulation
-    pub accumulator_set: Vec<AccumulatorItem>,
-
-    /// scratch space used to collect indices for input rows in a
-    /// bach that have values to aggregate. Reset on each batch
-    pub indices: Vec<u32>,
-}
-
 /// The state of all the groups
-pub struct AggregationState {
+pub(crate) struct AggregationState {
     pub reservation: MemoryReservation,
 
     /// Logically maps group values to an index in `group_states`
@@ -788,88 +755,3 @@ impl GroupedHashAggregateStream {
         Ok(Some(RecordBatch::try_new(self.schema.clone(), output)?))
     }
 }
-
-fn read_as_batch(rows: &[Vec<u8>], schema: &Schema) -> Vec<ArrayRef> {
-    let row_num = rows.len();
-    let mut output = MutableRecordBatch::new(row_num, Arc::new(schema.clone()));
-    let mut row = RowReader::new(schema);
-
-    for data in rows {
-        row.point_to(0, data);
-        read_row(&row, &mut output, schema);
-    }
-
-    output.output_as_columns()
-}
-
-fn get_at_indices(
-    input_values: &[Vec<ArrayRef>],
-    batch_indices: &PrimitiveArray<UInt32Type>,
-) -> Result<Vec<Vec<ArrayRef>>> {
-    input_values
-        .iter()
-        .map(|array| get_arrayref_at_indices(array, batch_indices))
-        .collect()
-}
-
-fn get_optional_filters(
-    original_values: &[Option<Arc<dyn Array>>],
-    batch_indices: &PrimitiveArray<UInt32Type>,
-) -> Vec<Option<Arc<dyn Array>>> {
-    original_values
-        .iter()
-        .map(|array| {
-            array.as_ref().map(|array| {
-                compute::take(
-                    array.as_ref(),
-                    batch_indices,
-                    None, // None: no index check
-                )
-                .unwrap()
-            })
-        })
-        .collect()
-}
-
-fn slice_and_maybe_filter(
-    aggr_array: &[ArrayRef],
-    filter_opt: Option<&Arc<dyn Array>>,
-    offsets: &[usize],
-) -> Result<Vec<ArrayRef>> {
-    let sliced_arrays: Vec<ArrayRef> = aggr_array
-        .iter()
-        .map(|array| array.slice(offsets[0], offsets[1] - offsets[0]))
-        .collect();
-
-    let filtered_arrays = match filter_opt.as_ref() {
-        Some(f) => {
-            let sliced = f.slice(offsets[0], offsets[1] - offsets[0]);
-            let filter_array = as_boolean_array(&sliced)?;
-
-            sliced_arrays
-                .iter()
-                .map(|array| filter(array, filter_array).unwrap())
-                .collect::<Vec<ArrayRef>>()
-        }
-        None => sliced_arrays,
-    };
-    Ok(filtered_arrays)
-}
-
-/// This method is similar to Scalar::try_from_array except for the Null handling.
-/// This method returns [ScalarValue::Null] instead of [ScalarValue::Type(None)]
-fn col_to_scalar(
-    array: &ArrayRef,
-    filter: &Option<&BooleanArray>,
-    row_index: usize,
-) -> Result<ScalarValue> {
-    if array.is_null(row_index) {
-        return Ok(ScalarValue::Null);
-    }
-    if let Some(filter) = filter {
-        if !filter.value(row_index) {
-            return Ok(ScalarValue::Null);
-        }
-    }
-    ScalarValue::try_from_array(array, row_index)
-}
diff --git a/datafusion/core/src/physical_plan/aggregates/utils.rs b/datafusion/core/src/physical_plan/aggregates/utils.rs
new file mode 100644
index 0000000000..54485b1740
--- /dev/null
+++ b/datafusion/core/src/physical_plan/aggregates/utils.rs
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::physical_plan::aggregates::AccumulatorItem;
+use arrow::compute;
+use arrow::compute::filter;
+use arrow::row::OwnedRow;
+use arrow_array::types::UInt32Type;
+use arrow_array::{Array, ArrayRef, BooleanArray, PrimitiveArray};
+use arrow_schema::{Schema, SchemaRef};
+use datafusion_common::cast::as_boolean_array;
+use datafusion_common::utils::get_arrayref_at_indices;
+use datafusion_common::{Result, ScalarValue};
+use datafusion_physical_expr::AggregateExpr;
+use datafusion_row::reader::{read_row, RowReader};
+use datafusion_row::MutableRecordBatch;
+use std::sync::Arc;
+
+/// The state that is built for each output group.
+#[derive(Debug)]
+pub(crate) struct GroupState {
+    /// The actual group by values, stored sequentially
+    pub group_by_values: OwnedRow,
+
+    // Accumulator state, stored sequentially
+    pub aggregation_buffer: Vec<u8>,
+
+    // Accumulator state, one for each aggregate that doesn't support row accumulation
+    pub accumulator_set: Vec<AccumulatorItem>,
+
+    /// scratch space used to collect indices for input rows in a
+    /// bach that have values to aggregate. Reset on each batch
+    pub indices: Vec<u32>,
+}
+
+#[derive(Debug)]
+/// tracks what phase the aggregation is in
+pub(crate) enum ExecutionState {
+    ReadingInput,
+    ProducingOutput,
+    Done,
+}
+
+pub(crate) fn aggr_state_schema(
+    aggr_expr: &[Arc<dyn AggregateExpr>],
+) -> Result<SchemaRef> {
+    let fields = aggr_expr
+        .iter()
+        .flat_map(|expr| expr.state_fields().unwrap().into_iter())
+        .collect::<Vec<_>>();
+    Ok(Arc::new(Schema::new(fields)))
+}
+
+pub(crate) fn read_as_batch(rows: &[Vec<u8>], schema: &Schema) -> Vec<ArrayRef> {
+    let row_num = rows.len();
+    let mut output = MutableRecordBatch::new(row_num, Arc::new(schema.clone()));
+    let mut row = RowReader::new(schema);
+
+    for data in rows {
+        row.point_to(0, data);
+        read_row(&row, &mut output, schema);
+    }
+
+    output.output_as_columns()
+}
+
+pub(crate) fn get_at_indices(
+    input_values: &[Vec<ArrayRef>],
+    batch_indices: &PrimitiveArray<UInt32Type>,
+) -> Result<Vec<Vec<ArrayRef>>> {
+    input_values
+        .iter()
+        .map(|array| get_arrayref_at_indices(array, batch_indices))
+        .collect()
+}
+
+pub(crate) fn get_optional_filters(
+    original_values: &[Option<Arc<dyn Array>>],
+    batch_indices: &PrimitiveArray<UInt32Type>,
+) -> Vec<Option<Arc<dyn Array>>> {
+    original_values
+        .iter()
+        .map(|array| {
+            array.as_ref().map(|array| {
+                compute::take(
+                    array.as_ref(),
+                    batch_indices,
+                    None, // None: no index check
+                )
+                .unwrap()
+            })
+        })
+        .collect()
+}
+
+pub(crate) fn slice_and_maybe_filter(
+    aggr_array: &[ArrayRef],
+    filter_opt: Option<&Arc<dyn Array>>,
+    offsets: &[usize],
+) -> Result<Vec<ArrayRef>> {
+    let sliced_arrays: Vec<ArrayRef> = aggr_array
+        .iter()
+        .map(|array| array.slice(offsets[0], offsets[1] - offsets[0]))
+        .collect();
+
+    let filtered_arrays = match filter_opt.as_ref() {
+        Some(f) => {
+            let sliced = f.slice(offsets[0], offsets[1] - offsets[0]);
+            let filter_array = as_boolean_array(&sliced)?;
+
+            sliced_arrays
+                .iter()
+                .map(|array| filter(array, filter_array).unwrap())
+                .collect::<Vec<ArrayRef>>()
+        }
+        None => sliced_arrays,
+    };
+    Ok(filtered_arrays)
+}
+
+/// This method is similar to Scalar::try_from_array except for the Null handling.
+/// This method returns [ScalarValue::Null] instead of [ScalarValue::Type(None)]
+pub(crate) fn col_to_scalar(
+    array: &ArrayRef,
+    filter: &Option<&BooleanArray>,
+    row_index: usize,
+) -> Result<ScalarValue> {
+    if array.is_null(row_index) {
+        return Ok(ScalarValue::Null);
+    }
+    if let Some(filter) = filter {
+        if !filter.value(row_index) {
+            return Ok(ScalarValue::Null);
+        }
+    }
+    ScalarValue::try_from_array(array, row_index)
+}
diff --git a/datafusion/core/src/physical_plan/analyze.rs b/datafusion/core/src/physical_plan/analyze.rs
index 2e12251c2f..08b5bb34ed 100644
--- a/datafusion/core/src/physical_plan/analyze.rs
+++ b/datafusion/core/src/physical_plan/analyze.rs
@@ -77,7 +77,7 @@ impl ExecutionPlan for AnalyzeExec {
     }
 
     /// Specifies whether this plan generates an infinite stream of records.
-    /// If the plan does not support pipelining, but it its input(s) are
+    /// If the plan does not support pipelining, but its input(s) are
     /// infinite, returns an error to indicate this.
     fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
         if children[0] {
diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs
index ec7dd7b4d6..82f37ea396 100644
--- a/datafusion/core/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/core/src/physical_plan/coalesce_batches.rs
@@ -96,7 +96,7 @@ impl ExecutionPlan for CoalesceBatchesExec {
     }
 
     /// Specifies whether this plan generates an infinite stream of records.
-    /// If the plan does not support pipelining, but it its input(s) are
+    /// If the plan does not support pipelining, but its input(s) are
     /// infinite, returns an error to indicate this.
     fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
         Ok(children[0])
diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs b/datafusion/core/src/physical_plan/coalesce_partitions.rs
index 8ff8a37aa3..fe667d1e6e 100644
--- a/datafusion/core/src/physical_plan/coalesce_partitions.rs
+++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs
@@ -81,7 +81,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
     }
 
     /// Specifies whether this plan generates an infinite stream of records.
-    /// If the plan does not support pipelining, but it its input(s) are
+    /// If the plan does not support pipelining, but its input(s) are
     /// infinite, returns an error to indicate this.
     fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
         Ok(children[0])
diff --git a/datafusion/core/src/physical_plan/filter.rs b/datafusion/core/src/physical_plan/filter.rs
index 494d3fc869..3786568cf3 100644
--- a/datafusion/core/src/physical_plan/filter.rs
+++ b/datafusion/core/src/physical_plan/filter.rs
@@ -107,7 +107,7 @@ impl ExecutionPlan for FilterExec {
     }
 
     /// Specifies whether this plan generates an infinite stream of records.
-    /// If the plan does not support pipelining, but it its input(s) are
+    /// If the plan does not support pipelining, but its input(s) are
     /// infinite, returns an error to indicate this.
     fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
         Ok(children[0])
diff --git a/datafusion/core/src/physical_plan/joins/cross_join.rs b/datafusion/core/src/physical_plan/joins/cross_join.rs
index 44a91865e2..67b7fd9718 100644
--- a/datafusion/core/src/physical_plan/joins/cross_join.rs
+++ b/datafusion/core/src/physical_plan/joins/cross_join.rs
@@ -160,7 +160,7 @@ impl ExecutionPlan for CrossJoinExec {
     }
 
     /// Specifies whether this plan generates an infinite stream of records.
-    /// If the plan does not support pipelining, but it its input(s) are
+    /// If the plan does not support pipelining, but its input(s) are
     /// infinite, returns an error to indicate this.    
     fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
         if children[0] || children[1] {
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs
index c624c5c17d..05a3c25243 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -247,7 +247,7 @@ impl ExecutionPlan for HashJoinExec {
     }
 
     /// Specifies whether this plan generates an infinite stream of records.
-    /// If the plan does not support pipelining, but it its input(s) are
+    /// If the plan does not support pipelining, but its input(s) are
     /// infinite, returns an error to indicate this.
     fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
         let (left, right) = (children[0], children[1]);
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index 23d3d70848..4356d4aa85 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -514,13 +514,12 @@ impl ExecutionPlan for SymmetricHashJoinExec {
                         for (join_side, child) in join_sides.iter().zip(children.iter()) {
                             let sorted_expr = child
                                 .output_ordering()
-                                .and_then(|orders| orders.first())
-                                .and_then(|order| {
+                                .and_then(|orders| {
                                     build_filter_input_order(
                                         *join_side,
                                         filter,
                                         &child.schema(),
-                                        order,
+                                        &orders[0],
                                     )
                                     .transpose()
                                 })
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index 50c145095b..01ece80aca 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -111,7 +111,7 @@ pub trait ExecutionPlan: Debug + Send + Sync {
     fn output_partitioning(&self) -> Partitioning;
 
     /// Specifies whether this plan generates an infinite stream of records.
-    /// If the plan does not support pipelining, but it its input(s) are
+    /// If the plan does not support pipelining, but its input(s) are
     /// infinite, returns an error to indicate this.
     fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
         Ok(false)
diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs
index 799de0c191..49c429f94d 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -156,7 +156,7 @@ impl ExecutionPlan for ProjectionExec {
     }
 
     /// Specifies whether this plan generates an infinite stream of records.
-    /// If the plan does not support pipelining, but it its input(s) are
+    /// If the plan does not support pipelining, but its input(s) are
     /// infinite, returns an error to indicate this.
     fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
         Ok(children[0])
diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs b/datafusion/core/src/physical_plan/repartition/mod.rs
index 7f13418d26..8db230a122 100644
--- a/datafusion/core/src/physical_plan/repartition/mod.rs
+++ b/datafusion/core/src/physical_plan/repartition/mod.rs
@@ -318,7 +318,7 @@ impl ExecutionPlan for RepartitionExec {
     }
 
     /// Specifies whether this plan generates an infinite stream of records.
-    /// If the plan does not support pipelining, but it its input(s) are
+    /// If the plan does not support pipelining, but its input(s) are
     /// infinite, returns an error to indicate this.    
     fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
         Ok(children[0])
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs
index 9196f520ad..5f644b658b 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -739,7 +739,7 @@ impl ExecutionPlan for SortExec {
     }
 
     /// Specifies whether this plan generates an infinite stream of records.
-    /// If the plan does not support pipelining, but it its input(s) are
+    /// If the plan does not support pipelining, but its input(s) are
     /// infinite, returns an error to indicate this.
     fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
         if children[0] {
diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs
index 0448813240..cace842aaf 100644
--- a/datafusion/core/src/physical_plan/union.rs
+++ b/datafusion/core/src/physical_plan/union.rs
@@ -157,7 +157,7 @@ impl ExecutionPlan for UnionExec {
     }
 
     /// Specifies whether this plan generates an infinite stream of records.
-    /// If the plan does not support pipelining, but it its input(s) are
+    /// If the plan does not support pipelining, but its input(s) are
     /// infinite, returns an error to indicate this.
     fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
         Ok(children.iter().any(|x| *x))
@@ -355,7 +355,7 @@ impl ExecutionPlan for InterleaveExec {
     }
 
     /// Specifies whether this plan generates an infinite stream of records.
-    /// If the plan does not support pipelining, but it its input(s) are
+    /// If the plan does not support pipelining, but its input(s) are
     /// infinite, returns an error to indicate this.
     fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
         Ok(children.iter().any(|x| *x))
diff --git a/datafusion/core/src/physical_plan/unnest.rs b/datafusion/core/src/physical_plan/unnest.rs
index 0d60273c30..9a9408035d 100644
--- a/datafusion/core/src/physical_plan/unnest.rs
+++ b/datafusion/core/src/physical_plan/unnest.rs
@@ -77,7 +77,7 @@ impl ExecutionPlan for UnnestExec {
     }
 
     /// Specifies whether this plan generates an infinite stream of records.
-    /// If the plan does not support pipelining, but it its input(s) are
+    /// If the plan does not support pipelining, but its input(s) are
     /// infinite, returns an error to indicate this.    
     fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
         Ok(children[0])
diff --git a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
index 5dbafab76f..ac84a6a395 100644
--- a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
@@ -171,8 +171,7 @@ impl BoundedWindowAggExec {
     // to calculate partition separation points
     pub fn partition_by_sort_keys(&self) -> Result<Vec<PhysicalSortExpr>> {
         // Partition by sort keys indices are stored in self.ordered_partition_by_indices.
-        let sort_keys = self.input.output_ordering();
-        let sort_keys = sort_keys.unwrap_or(&[]);
+        let sort_keys = self.input.output_ordering().unwrap_or(&[]);
         get_at_indices(sort_keys, &self.ordered_partition_by_indices)
     }
 
diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index bda1b52ff8..7ca95954ce 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -144,7 +144,7 @@ impl ExecutionPlan for WindowAggExec {
     }
 
     /// Specifies whether this plan generates an infinite stream of records.
-    /// If the plan does not support pipelining, but it its input(s) are
+    /// If the plan does not support pipelining, but its input(s) are
     /// infinite, returns an error to indicate this.    
     fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
         if children[0] {
diff --git a/datafusion/core/tests/aggregate_fuzz.rs b/datafusion/core/tests/aggregate_fuzz.rs
new file mode 100644
index 0000000000..14cf469624
--- /dev/null
+++ b/datafusion/core/tests/aggregate_fuzz.rs
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow::array::{ArrayRef, Int64Array};
+use arrow::compute::{concat_batches, SortOptions};
+use arrow::datatypes::DataType;
+use arrow::record_batch::RecordBatch;
+use arrow::util::pretty::pretty_format_batches;
+use datafusion::physical_plan::aggregates::{
+    AggregateExec, AggregateMode, PhysicalGroupBy,
+};
+use rand::rngs::StdRng;
+use rand::{Rng, SeedableRng};
+
+use datafusion::physical_plan::collect;
+use datafusion::physical_plan::memory::MemoryExec;
+use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion_physical_expr::expressions::{col, Sum};
+use datafusion_physical_expr::{AggregateExpr, PhysicalSortExpr};
+use test_utils::add_empty_batches;
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
+    async fn aggregate_test() {
+        let test_cases = vec![
+            vec!["a"],
+            vec!["b", "a"],
+            vec!["c", "a"],
+            vec!["c", "b", "a"],
+            vec!["d", "a"],
+            vec!["d", "b", "a"],
+            vec!["d", "c", "a"],
+            vec!["d", "c", "b", "a"],
+        ];
+        let n = 300;
+        let distincts = vec![10, 20];
+        for distinct in distincts {
+            let mut handles = Vec::new();
+            for i in 0..n {
+                let test_idx = i % test_cases.len();
+                let group_by_columns = test_cases[test_idx].clone();
+                let job = tokio::spawn(run_aggregate_test(
+                    make_staggered_batches::<true>(1000, distinct, i as u64),
+                    group_by_columns,
+                ));
+                handles.push(job);
+            }
+            for job in handles {
+                job.await.unwrap();
+            }
+        }
+    }
+}
+
+/// Perform batch and streaming aggregation with same input
+/// and verify outputs of `AggregateExec` with pipeline breaking stream `GroupedHashAggregateStream`
+/// and non-pipeline breaking stream `BoundedAggregateStream` produces same result.
+async fn run_aggregate_test(input1: Vec<RecordBatch>, group_by_columns: Vec<&str>) {
+    let schema = input1[0].schema();
+    let session_config = SessionConfig::new().with_batch_size(50);
+    let ctx = SessionContext::with_config(session_config);
+    let mut sort_keys = vec![];
+    for ordering_col in ["a", "b", "c"] {
+        sort_keys.push(PhysicalSortExpr {
+            expr: col(ordering_col, &schema).unwrap(),
+            options: SortOptions::default(),
+        })
+    }
+
+    let concat_input_record = concat_batches(&schema, &input1).unwrap();
+    let usual_source = Arc::new(
+        MemoryExec::try_new(&[vec![concat_input_record]], schema.clone(), None).unwrap(),
+    );
+
+    let running_source = Arc::new(
+        MemoryExec::try_new(&[input1.clone()], schema.clone(), None)
+            .unwrap()
+            .with_sort_information(sort_keys),
+    );
+
+    let aggregate_expr = vec![Arc::new(Sum::new(
+        col("d", &schema).unwrap(),
+        "sum1",
+        DataType::Int64,
+    )) as Arc<dyn AggregateExpr>];
+    let expr = group_by_columns
+        .iter()
+        .map(|elem| (col(elem, &schema).unwrap(), elem.to_string()))
+        .collect::<Vec<_>>();
+    let group_by = PhysicalGroupBy::new_single(expr);
+    let aggregate_exec_running = Arc::new(
+        AggregateExec::try_new(
+            AggregateMode::Partial,
+            group_by.clone(),
+            aggregate_expr.clone(),
+            vec![None],
+            running_source,
+            schema.clone(),
+        )
+        .unwrap(),
+    ) as _;
+
+    let aggregate_exec_usual = Arc::new(
+        AggregateExec::try_new(
+            AggregateMode::Partial,
+            group_by.clone(),
+            aggregate_expr.clone(),
+            vec![None],
+            usual_source,
+            schema.clone(),
+        )
+        .unwrap(),
+    ) as _;
+
+    let task_ctx = ctx.task_ctx();
+    let collected_usual = collect(aggregate_exec_usual, task_ctx.clone())
+        .await
+        .unwrap();
+
+    let collected_running = collect(aggregate_exec_running, task_ctx.clone())
+        .await
+        .unwrap();
+    assert!(collected_running.len() > 2);
+    // Running should produce more chunk than the usual AggregateExec.
+    // Otherwise it means that we cannot generate result in running mode.
+    assert!(collected_running.len() > collected_usual.len());
+    // compare
+    let usual_formatted = pretty_format_batches(&collected_usual).unwrap().to_string();
+    let running_formatted = pretty_format_batches(&collected_running)
+        .unwrap()
+        .to_string();
+
+    let mut usual_formatted_sorted: Vec<&str> = usual_formatted.trim().lines().collect();
+    usual_formatted_sorted.sort_unstable();
+
+    let mut running_formatted_sorted: Vec<&str> =
+        running_formatted.trim().lines().collect();
+    running_formatted_sorted.sort_unstable();
+    for (i, (usual_line, running_line)) in usual_formatted_sorted
+        .iter()
+        .zip(&running_formatted_sorted)
+        .enumerate()
+    {
+        assert_eq!((i, usual_line), (i, running_line), "Inconsistent result");
+    }
+}
+
+/// Return randomly sized record batches with:
+/// three sorted int64 columns 'a', 'b', 'c' ranged from 0..'n_distinct' as columns
+/// one random int64 column 'd' as other columns
+pub(crate) fn make_staggered_batches<const STREAM: bool>(
+    len: usize,
+    n_distinct: usize,
+    random_seed: u64,
+) -> Vec<RecordBatch> {
+    // use a random number generator to pick a random sized output
+    let mut rng = StdRng::seed_from_u64(random_seed);
+    let mut input123: Vec<(i64, i64, i64)> = vec![(0, 0, 0); len];
+    let mut input4: Vec<i64> = vec![0; len];
+    input123.iter_mut().for_each(|v| {
+        *v = (
+            rng.gen_range(0..n_distinct) as i64,
+            rng.gen_range(0..n_distinct) as i64,
+            rng.gen_range(0..n_distinct) as i64,
+        )
+    });
+    input4.iter_mut().for_each(|v| {
+        *v = rng.gen_range(0..n_distinct) as i64;
+    });
+    input123.sort();
+    let input1 = Int64Array::from_iter_values(input123.clone().into_iter().map(|k| k.0));
+    let input2 = Int64Array::from_iter_values(input123.clone().into_iter().map(|k| k.1));
+    let input3 = Int64Array::from_iter_values(input123.clone().into_iter().map(|k| k.2));
+    let input4 = Int64Array::from_iter_values(input4.into_iter());
+
+    // split into several record batches
+    let mut remainder = RecordBatch::try_from_iter(vec![
+        ("a", Arc::new(input1) as ArrayRef),
+        ("b", Arc::new(input2) as ArrayRef),
+        ("c", Arc::new(input3) as ArrayRef),
+        ("d", Arc::new(input4) as ArrayRef),
+    ])
+    .unwrap();
+
+    let mut batches = vec![];
+    if STREAM {
+        while remainder.num_rows() > 0 {
+            let batch_size = rng.gen_range(0..50);
+            if remainder.num_rows() < batch_size {
+                break;
+            }
+            batches.push(remainder.slice(0, batch_size));
+            remainder = remainder.slice(batch_size, remainder.num_rows() - batch_size);
+        }
+    } else {
+        while remainder.num_rows() > 0 {
+            let batch_size = rng.gen_range(0..remainder.num_rows() + 1);
+            batches.push(remainder.slice(0, batch_size));
+            remainder = remainder.slice(batch_size, remainder.num_rows() - batch_size);
+        }
+    }
+    add_empty_batches(batches, &mut rng)
+}
diff --git a/datafusion/core/tests/sql/group_by.rs b/datafusion/core/tests/sql/group_by.rs
index b4a92db3fc..299c49028b 100644
--- a/datafusion/core/tests/sql/group_by.rs
+++ b/datafusion/core/tests/sql/group_by.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use super::*;
+use datafusion::test_util::get_test_context2;
 
 #[tokio::test]
 async fn group_by_date_trunc() -> Result<()> {
@@ -160,3 +161,99 @@ async fn group_by_dictionary() {
     run_test_case::<UInt32Type>().await;
     run_test_case::<UInt64Type>().await;
 }
+
+#[tokio::test]
+async fn test_source_sorted_groupby() -> Result<()> {
+    let tmpdir = TempDir::new().unwrap();
+    let session_config = SessionConfig::new().with_target_partitions(1);
+    let ctx = get_test_context2(&tmpdir, true, session_config).await?;
+
+    let sql = "SELECT a, b,
+           SUM(c) as summation1
+           FROM annotated_data
+           GROUP BY b, a";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    let expected = {
+        vec![
+            "ProjectionExec: expr=[a@1 as a, b@0 as b, SUM(annotated_data.c)@2 as summation1]",
+            "  AggregateExec: mode=Single, gby=[b@1 as b, a@0 as a], aggr=[SUM(annotated_data.c)]",
+        ]
+    };
+
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    let actual_len = actual.len();
+    let actual_trim_last = &actual[..actual_len - 1];
+    assert_eq!(
+        expected, actual_trim_last,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+---+---+------------+",
+        "| a | b | summation1 |",
+        "+---+---+------------+",
+        "| 0 | 0 | 300        |",
+        "| 0 | 1 | 925        |",
+        "| 1 | 2 | 1550       |",
+        "| 1 | 3 | 2175       |",
+        "+---+---+------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_source_sorted_groupby2() -> Result<()> {
+    let tmpdir = TempDir::new().unwrap();
+    let session_config = SessionConfig::new().with_target_partitions(1);
+    let ctx = get_test_context2(&tmpdir, true, session_config).await?;
+
+    let sql = "SELECT a, d,
+           SUM(c) as summation1
+           FROM annotated_data
+           GROUP BY d, a";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    let expected = {
+        vec![
+            "ProjectionExec: expr=[a@1 as a, d@0 as d, SUM(annotated_data.c)@2 as summation1]",
+            "  AggregateExec: mode=Single, gby=[d@2 as d, a@0 as a], aggr=[SUM(annotated_data.c)]",
+        ]
+    };
+
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    let actual_len = actual.len();
+    let actual_trim_last = &actual[..actual_len - 1];
+    assert_eq!(
+        expected, actual_trim_last,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+---+---+------------+",
+        "| a | d | summation1 |",
+        "+---+---+------------+",
+        "| 0 | 0 | 292        |",
+        "| 0 | 2 | 196        |",
+        "| 0 | 1 | 315        |",
+        "| 0 | 4 | 164        |",
+        "| 0 | 3 | 258        |",
+        "| 1 | 0 | 622        |",
+        "| 1 | 3 | 299        |",
+        "| 1 | 1 | 1043       |",
+        "| 1 | 4 | 913        |",
+        "| 1 | 2 | 848        |",
+        "+---+---+------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt b/datafusion/core/tests/sqllogictests/test_files/window.slt
index 8b95005e1b..60339955b4 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -356,23 +356,22 @@ Sort: d.b ASC NULLS LAST
                       EmptyRelation
 physical_plan
 SortPreservingMergeExec: [b@0 ASC NULLS LAST]
-  SortExec: expr=[b@0 ASC NULLS LAST]
-    ProjectionExec: expr=[b@0 as b, MAX(d.a)@1 as max_a, MAX(d.seq)@2 as MAX(d.seq)]
-      AggregateExec: mode=Single, gby=[b@2 as b], aggr=[MAX(d.a), MAX(d.seq)]
-        ProjectionExec: expr=[ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as seq, a@0 as a, b@1 as b]
-          BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
-            SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST]
-              CoalesceBatchesExec: target_batch_size=8192
-                RepartitionExec: partitioning=Hash([Column { name: "b", index: 1 }], 4), input_partitions=4
-                  UnionExec
-                    ProjectionExec: expr=[1 as a, aa as b]
-                      EmptyExec: produce_one_row=true
-                    ProjectionExec: expr=[3 as a, aa as b]
-                      EmptyExec: produce_one_row=true
-                    ProjectionExec: expr=[5 as a, bb as b]
-                      EmptyExec: produce_one_row=true
-                    ProjectionExec: expr=[7 as a, bb as b]
-                      EmptyExec: produce_one_row=true
+  ProjectionExec: expr=[b@0 as b, MAX(d.a)@1 as max_a, MAX(d.seq)@2 as MAX(d.seq)]
+    AggregateExec: mode=Single, gby=[b@2 as b], aggr=[MAX(d.a), MAX(d.seq)]
+      ProjectionExec: expr=[ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as seq, a@0 as a, b@1 as b]
+        BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+          SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST]
+            CoalesceBatchesExec: target_batch_size=8192
+              RepartitionExec: partitioning=Hash([Column { name: "b", index: 1 }], 4), input_partitions=4
+                UnionExec
+                  ProjectionExec: expr=[1 as a, aa as b]
+                    EmptyExec: produce_one_row=true
+                  ProjectionExec: expr=[3 as a, aa as b]
+                    EmptyExec: produce_one_row=true
+                  ProjectionExec: expr=[5 as a, bb as b]
+                    EmptyExec: produce_one_row=true
+                  ProjectionExec: expr=[7 as a, bb as b]
+                    EmptyExec: produce_one_row=true
 
 
 # check actual result
diff --git a/datafusion/core/tests/window_fuzz.rs b/datafusion/core/tests/window_fuzz.rs
index e8524304a3..77b6e0a5d1 100644
--- a/datafusion/core/tests/window_fuzz.rs
+++ b/datafusion/core/tests/window_fuzz.rs
@@ -480,6 +480,10 @@ async fn run_window_test(
     let collected_running = collect(running_window_exec, task_ctx.clone())
         .await
         .unwrap();
+
+    // BoundedWindowAggExec should produce more chunk than the usual WindowAggExec.
+    // Otherwise it means that we cannot generate result in running mode.
+    assert!(collected_running.len() > collected_usual.len());
     // compare
     let usual_formatted = pretty_format_batches(&collected_usual).unwrap().to_string();
     let running_formatted = pretty_format_batches(&collected_running)