You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/07/19 10:50:34 UTC

[arrow-datafusion] branch main updated: Consolidate `BoundedAggregateStream` (#6932)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 1810a15384 Consolidate `BoundedAggregateStream` (#6932)
1810a15384 is described below

commit 1810a15384791b00c0f796fbdfc8d0d5ddb5fdae
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Wed Jul 19 06:50:29 2023 -0400

    Consolidate `BoundedAggregateStream` (#6932)
    
    * Consolidate `BoundedAggregateStream`
    
    * Clarify end of input
    
    * Apply suggestions from code review
    
    Co-authored-by: Mustafa Akur <10...@users.noreply.github.com>
    
    * Update diagram for partial sort
    
    * assert input is not done
    
    * Apply suggestions from code review
    
    Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
    
    * clarify text
    
    * Add more comments about delta memory accounting
    
    ---------
    
    Co-authored-by: Mustafa Akur <10...@users.noreply.github.com>
    Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
---
 .../aggregates/bounded_aggregate_stream.rs         | 1080 --------------------
 .../core/src/physical_plan/aggregates/mod.rs       |   19 +-
 .../src/physical_plan/aggregates/order/full.rs     |  163 +++
 .../core/src/physical_plan/aggregates/order/mod.rs |  139 +++
 .../src/physical_plan/aggregates/order/partial.rs  |  267 +++++
 .../core/src/physical_plan/aggregates/row_hash.rs  |  230 +++--
 .../core/src/physical_plan/aggregates/utils.rs     |  150 ---
 datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs |    5 +-
 datafusion/execution/src/memory_pool/mod.rs        |   47 +-
 datafusion/physical-expr/src/aggregate/average.rs  |   19 +-
 datafusion/physical-expr/src/aggregate/count.rs    |    9 +-
 .../src/aggregate/groups_accumulator/accumulate.rs |   37 +-
 .../src/aggregate/groups_accumulator/adapter.rs    |  112 +-
 .../src/aggregate/groups_accumulator/bool_op.rs    |   25 +-
 .../src/aggregate/groups_accumulator/mod.rs        |   62 +-
 .../src/aggregate/groups_accumulator/prim_op.rs    |   12 +-
 datafusion/physical-expr/src/lib.rs                |    5 +-
 17 files changed, 983 insertions(+), 1398 deletions(-)

diff --git a/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs b/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
deleted file mode 100644
index 5982701e21..0000000000
--- a/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
+++ /dev/null
@@ -1,1080 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! This file implements streaming aggregation on ordered GROUP BY expressions.
-//! Generated output will itself have an ordering and the executor can run with
-//! bounded memory, ensuring composability in streaming cases.
-
-use std::cmp::min;
-use std::ops::Range;
-use std::sync::Arc;
-use std::task::{Context, Poll};
-use std::vec;
-
-use ahash::RandomState;
-use futures::ready;
-use futures::stream::{Stream, StreamExt};
-use hashbrown::raw::RawTable;
-use itertools::izip;
-
-use crate::physical_plan::aggregates::{
-    evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AggregateMode,
-    AggregationOrdering, GroupByOrderMode, PhysicalGroupBy, RowAccumulatorItem,
-};
-use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
-use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
-use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
-use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
-use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
-use datafusion_execution::TaskContext;
-
-use crate::physical_plan::aggregates::utils::{
-    aggr_state_schema, col_to_scalar, get_at_indices, get_optional_filters,
-    read_as_batch, slice_and_maybe_filter, ExecutionState, GroupState,
-};
-use arrow::array::{new_null_array, ArrayRef, UInt32Builder};
-use arrow::compute::{cast, SortColumn};
-use arrow::datatypes::DataType;
-use arrow::row::{OwnedRow, RowConverter, SortField};
-use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
-use datafusion_common::cast::as_boolean_array;
-use datafusion_common::utils::{evaluate_partition_ranges, get_row_at_idx};
-use datafusion_common::{Result, ScalarValue};
-use datafusion_expr::Accumulator;
-use datafusion_physical_expr::hash_utils::create_hashes;
-use datafusion_row::accessor::RowAccessor;
-use datafusion_row::layout::RowLayout;
-
-use super::AggregateExec;
-
-/// Grouping aggregate with row-format aggregation states inside.
-///
-/// For each aggregation entry, we use:
-/// - [Arrow-row] represents grouping keys for fast hash computation and comparison directly on raw bytes.
-/// - [WordAligned] row to store aggregation state, designed to be CPU-friendly when updates over every field are often.
-///
-/// The architecture is the following:
-///
-/// 1. For each input RecordBatch, update aggregation states corresponding to all appeared grouping keys.
-/// 2. At the end of the aggregation (e.g. end of batches in a partition), the accumulator converts its state to a RecordBatch of a single row
-/// 3. The RecordBatches of all accumulators are merged (`concatenate` in `rust/arrow`) together to a single RecordBatch.
-/// 4. The state's RecordBatch is `merge`d to a new state
-/// 5. The state is mapped to the final value
-///
-/// [Arrow-row]: OwnedRow
-/// [WordAligned]: datafusion_row::layout
-pub(crate) struct BoundedAggregateStream {
-    schema: SchemaRef,
-    input: SendableRecordBatchStream,
-    mode: AggregateMode,
-
-    normal_aggr_expr: Vec<Arc<dyn AggregateExpr>>,
-    /// Aggregate expressions not supporting row accumulation
-    normal_aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
-    /// Filter expression for each normal aggregate expression
-    normal_filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
-
-    /// Aggregate expressions supporting row accumulation
-    row_aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
-    /// Filter expression for each row aggregate expression
-    row_filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
-    row_accumulators: Vec<RowAccumulatorItem>,
-    row_converter: RowConverter,
-    row_aggr_schema: SchemaRef,
-    row_aggr_layout: Arc<RowLayout>,
-
-    group_by: PhysicalGroupBy,
-
-    aggr_state: AggregationState,
-    exec_state: ExecutionState,
-    baseline_metrics: BaselineMetrics,
-    random_state: RandomState,
-    /// size to be used for resulting RecordBatches
-    batch_size: usize,
-    /// threshold for using `ScalarValue`s to update
-    /// accumulators during high-cardinality aggregations for each input batch.
-    scalar_update_factor: usize,
-    /// if the result is chunked into batches,
-    /// last offset is preserved for continuation.
-    row_group_skip_position: usize,
-    /// keeps range for each accumulator in the field
-    /// first element in the array corresponds to normal accumulators
-    /// second element in the array corresponds to row accumulators
-    indices: [Vec<Range<usize>>; 2],
-    /// Information on how the input of this group is ordered
-    aggregation_ordering: AggregationOrdering,
-    /// Has this stream finished producing output
-    is_end: bool,
-}
-
-impl BoundedAggregateStream {
-    /// Create a new BoundedAggregateStream
-    pub fn new(
-        agg: &AggregateExec,
-        context: Arc<TaskContext>,
-        partition: usize,
-        aggregation_ordering: AggregationOrdering, // Stores algorithm mode and output ordering
-    ) -> Result<Self> {
-        let agg_schema = Arc::clone(&agg.schema);
-        let agg_group_by = agg.group_by.clone();
-        let agg_filter_expr = agg.filter_expr.clone();
-
-        let batch_size = context.session_config().batch_size();
-        let scalar_update_factor = context.session_config().agg_scalar_update_factor();
-        let input = agg.input.execute(partition, Arc::clone(&context))?;
-        let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition);
-
-        let timer = baseline_metrics.elapsed_compute().timer();
-
-        let mut start_idx = agg_group_by.expr.len();
-        let mut row_aggr_expr = vec![];
-        let mut row_agg_indices = vec![];
-        let mut row_aggregate_expressions = vec![];
-        let mut row_filter_expressions = vec![];
-        let mut normal_aggr_expr = vec![];
-        let mut normal_agg_indices = vec![];
-        let mut normal_aggregate_expressions = vec![];
-        let mut normal_filter_expressions = vec![];
-        // The expressions to evaluate the batch, one vec of expressions per aggregation.
-        // Assuming create_schema() always puts group columns in front of aggregation columns, we set
-        // col_idx_base to the group expression count.
-        let all_aggregate_expressions =
-            aggregates::aggregate_expressions(&agg.aggr_expr, &agg.mode, start_idx)?;
-        let filter_expressions = match agg.mode {
-            AggregateMode::Partial
-            | AggregateMode::Single
-            | AggregateMode::SinglePartitioned => agg_filter_expr,
-            AggregateMode::Final | AggregateMode::FinalPartitioned => {
-                vec![None; agg.aggr_expr.len()]
-            }
-        };
-        for ((expr, others), filter) in agg
-            .aggr_expr
-            .iter()
-            .zip(all_aggregate_expressions.into_iter())
-            .zip(filter_expressions.into_iter())
-        {
-            let n_fields = match agg.mode {
-                // In partial aggregation, we keep additional fields in order to successfully
-                // merge aggregation results downstream.
-                AggregateMode::Partial => expr.state_fields()?.len(),
-                _ => 1,
-            };
-            // Stores range of each expression:
-            let aggr_range = Range {
-                start: start_idx,
-                end: start_idx + n_fields,
-            };
-            if expr.row_accumulator_supported() {
-                row_aggregate_expressions.push(others);
-                row_filter_expressions.push(filter.clone());
-                row_agg_indices.push(aggr_range);
-                row_aggr_expr.push(expr.clone());
-            } else {
-                normal_aggregate_expressions.push(others);
-                normal_filter_expressions.push(filter.clone());
-                normal_agg_indices.push(aggr_range);
-                normal_aggr_expr.push(expr.clone());
-            }
-            start_idx += n_fields;
-        }
-
-        let row_accumulators = aggregates::create_row_accumulators(&row_aggr_expr)?;
-
-        let row_aggr_schema = aggr_state_schema(&row_aggr_expr);
-
-        let group_schema = group_schema(&agg_schema, agg_group_by.expr.len());
-        let row_converter = RowConverter::new(
-            group_schema
-                .fields()
-                .iter()
-                .map(|f| SortField::new(f.data_type().clone()))
-                .collect(),
-        )?;
-
-        let row_aggr_layout = Arc::new(RowLayout::new(&row_aggr_schema));
-
-        let name = format!("BoundedAggregateStream[{partition}]");
-        let aggr_state = AggregationState {
-            reservation: MemoryConsumer::new(name).register(context.memory_pool()),
-            map: RawTable::with_capacity(0),
-            ordered_group_states: Vec::with_capacity(0),
-        };
-
-        timer.done();
-
-        let exec_state = ExecutionState::ReadingInput;
-
-        Ok(BoundedAggregateStream {
-            schema: agg_schema,
-            input,
-            mode: agg.mode,
-            normal_aggr_expr,
-            normal_aggregate_expressions,
-            normal_filter_expressions,
-            row_aggregate_expressions,
-            row_filter_expressions,
-            row_accumulators,
-            row_converter,
-            row_aggr_schema,
-            row_aggr_layout,
-            group_by: agg_group_by,
-            aggr_state,
-            exec_state,
-            baseline_metrics,
-            random_state: Default::default(),
-            batch_size,
-            scalar_update_factor,
-            row_group_skip_position: 0,
-            indices: [normal_agg_indices, row_agg_indices],
-            is_end: false,
-            aggregation_ordering,
-        })
-    }
-}
-
-impl Stream for BoundedAggregateStream {
-    type Item = Result<RecordBatch>;
-
-    fn poll_next(
-        mut self: std::pin::Pin<&mut Self>,
-        cx: &mut Context<'_>,
-    ) -> Poll<Option<Self::Item>> {
-        let elapsed_compute = self.baseline_metrics.elapsed_compute().clone();
-
-        loop {
-            match self.exec_state {
-                ExecutionState::ReadingInput => {
-                    match ready!(self.input.poll_next_unpin(cx)) {
-                        // new batch to aggregate
-                        Some(Ok(batch)) => {
-                            let timer = elapsed_compute.timer();
-                            let result = self.group_aggregate_batch(batch);
-                            timer.done();
-
-                            // allocate memory
-                            // This happens AFTER we actually used the memory, but simplifies the whole accounting and we are OK with
-                            // overshooting a bit. Also this means we either store the whole record batch or not.
-                            let result = result.and_then(|allocated| {
-                                self.aggr_state.reservation.try_grow(allocated)
-                            });
-
-                            if let Err(e) = result {
-                                return Poll::Ready(Some(Err(e)));
-                            }
-                        }
-                        // inner had error, return to caller
-                        Some(Err(e)) => return Poll::Ready(Some(Err(e))),
-                        // inner is done, switch to producing output
-                        None => {
-                            let states = self.aggr_state.ordered_group_states.iter_mut();
-                            for state in states {
-                                state.status = GroupStatus::CanEmit;
-                            }
-                            self.exec_state = ExecutionState::ProducingOutput;
-                        }
-                    }
-                }
-
-                ExecutionState::ProducingOutput => {
-                    let timer = elapsed_compute.timer();
-                    let result = self.create_batch_from_map();
-
-                    timer.done();
-
-                    match result {
-                        // made output
-                        Ok(Some(result)) => {
-                            let batch = result.record_output(&self.baseline_metrics);
-                            self.row_group_skip_position += batch.num_rows();
-                            // try to read more input
-                            self.exec_state = ExecutionState::ReadingInput;
-                            self.prune();
-                            return Poll::Ready(Some(Ok(batch)));
-                        }
-                        // end of output
-                        Ok(None) => {
-                            self.exec_state = ExecutionState::Done;
-                        }
-                        // error making output
-                        Err(error) => return Poll::Ready(Some(Err(error))),
-                    }
-                }
-                ExecutionState::Done => return Poll::Ready(None),
-            }
-        }
-    }
-}
-
-impl RecordBatchStream for BoundedAggregateStream {
-    fn schema(&self) -> SchemaRef {
-        self.schema.clone()
-    }
-}
-
-/// This utility object encapsulates the row object, the hash and the group
-/// indices for a group. This information is used when executing streaming
-/// GROUP BY calculations.
-struct GroupOrderInfo {
-    /// The group by key
-    owned_row: OwnedRow,
-    /// the hash value of the group
-    hash: u64,
-    /// the range of row indices in the input batch that belong to this group
-    range: Range<usize>,
-}
-
-impl BoundedAggregateStream {
-    /// Update the aggr_state hash table according to group_by values
-    /// (result of group_by_expressions) when group by expressions are
-    /// fully ordered.
-    fn update_fully_ordered_group_state(
-        &mut self,
-        group_values: &[ArrayRef],
-        per_group_order_info: Vec<GroupOrderInfo>,
-        allocated: &mut usize,
-    ) -> Result<Vec<usize>> {
-        // 1.1 construct the key from the group values
-        // 1.2 construct the mapping key if it does not exist
-        // 1.3 add the row' index to `indices`
-
-        // track which entries in `aggr_state` have rows in this batch to aggregate
-        let mut groups_with_rows = vec![];
-
-        let AggregationState {
-            map: row_map,
-            ordered_group_states,
-            ..
-        } = &mut self.aggr_state;
-
-        for GroupOrderInfo {
-            owned_row,
-            hash,
-            range,
-        } in per_group_order_info
-        {
-            let entry = row_map.get_mut(hash, |(_hash, group_idx)| {
-                // verify that a group that we are inserting with hash is
-                // actually the same key value as the group in
-                // existing_idx  (aka group_values @ row)
-                let ordered_group_state = &ordered_group_states[*group_idx];
-                let group_state = &ordered_group_state.group_state;
-                owned_row.row() == group_state.group_by_values.row()
-            });
-
-            match entry {
-                // Existing entry for this group value
-                Some((_hash, group_idx)) => {
-                    let group_state = &mut ordered_group_states[*group_idx].group_state;
-
-                    // 1.3
-                    if group_state.indices.is_empty() {
-                        groups_with_rows.push(*group_idx);
-                    };
-                    for row in range.start..range.end {
-                        // remember this row
-                        group_state.indices.push_accounted(row as u32, allocated);
-                    }
-                }
-                //  1.2 Need to create new entry
-                None => {
-                    let accumulator_set =
-                        aggregates::create_accumulators(&self.normal_aggr_expr)?;
-                    // Save the value of the ordering columns as Vec<ScalarValue>
-                    let row = get_row_at_idx(group_values, range.start)?;
-                    let ordered_columns = self
-                        .aggregation_ordering
-                        .order_indices
-                        .iter()
-                        .map(|idx| row[*idx].clone())
-                        .collect::<Vec<_>>();
-
-                    // Add new entry to group_states and save newly created index
-                    let group_state = GroupState {
-                        group_by_values: owned_row,
-                        aggregation_buffer: vec![
-                            0;
-                            self.row_aggr_layout.fixed_part_width()
-                        ],
-                        accumulator_set,
-                        indices: (range.start as u32..range.end as u32)
-                            .collect::<Vec<_>>(), // 1.3
-                    };
-                    let group_idx = ordered_group_states.len();
-
-                    // NOTE: do NOT include the `RowGroupState` struct size in here because this is captured by
-                    // `group_states` (see allocation down below)
-                    *allocated += std::mem::size_of_val(&group_state.group_by_values)
-                        + (std::mem::size_of::<u8>()
-                            * group_state.aggregation_buffer.capacity())
-                        + (std::mem::size_of::<u32>() * group_state.indices.capacity());
-
-                    // Allocation done by normal accumulators
-                    *allocated += (std::mem::size_of::<Box<dyn Accumulator>>()
-                        * group_state.accumulator_set.capacity())
-                        + group_state
-                            .accumulator_set
-                            .iter()
-                            .map(|accu| accu.size())
-                            .sum::<usize>();
-
-                    // for hasher function, use precomputed hash value
-                    row_map.insert_accounted(
-                        (hash, group_idx),
-                        |(hash, _group_index)| *hash,
-                        allocated,
-                    );
-
-                    let ordered_group_state = OrderedGroupState {
-                        group_state,
-                        ordered_columns,
-                        status: GroupStatus::GroupInProgress,
-                        hash,
-                    };
-                    ordered_group_states.push_accounted(ordered_group_state, allocated);
-
-                    groups_with_rows.push(group_idx);
-                }
-            };
-        }
-        Ok(groups_with_rows)
-    }
-
-    // Update the aggr_state according to group_by values (result of group_by_expressions)
-    fn update_group_state(
-        &mut self,
-        group_values: &[ArrayRef],
-        allocated: &mut usize,
-    ) -> Result<Vec<usize>> {
-        // 1.1 construct the key from the group values
-        // 1.2 construct the mapping key if it does not exist
-        // 1.3 add the row' index to `indices`
-
-        // track which entries in `aggr_state` have rows in this batch to aggregate
-        let mut groups_with_rows = vec![];
-
-        let group_rows = self.row_converter.convert_columns(group_values)?;
-        let n_rows = group_rows.num_rows();
-        // 1.1 Calculate the group keys for the group values
-        let mut batch_hashes = vec![0; n_rows];
-        create_hashes(group_values, &self.random_state, &mut batch_hashes)?;
-
-        let AggregationState {
-            map,
-            ordered_group_states: group_states,
-            ..
-        } = &mut self.aggr_state;
-
-        for (row, hash) in batch_hashes.into_iter().enumerate() {
-            let entry = map.get_mut(hash, |(_hash, group_idx)| {
-                // verify that a group that we are inserting with hash is
-                // actually the same key value as the group in
-                // existing_idx  (aka group_values @ row)
-                let group_state = &group_states[*group_idx].group_state;
-                group_rows.row(row) == group_state.group_by_values.row()
-            });
-
-            match entry {
-                // Existing entry for this group value
-                Some((_hash, group_idx)) => {
-                    let group_state = &mut group_states[*group_idx].group_state;
-
-                    // 1.3
-                    if group_state.indices.is_empty() {
-                        groups_with_rows.push(*group_idx);
-                    };
-
-                    group_state.indices.push_accounted(row as u32, allocated); // remember this row
-                }
-                //  1.2 Need to create new entry
-                None => {
-                    let accumulator_set =
-                        aggregates::create_accumulators(&self.normal_aggr_expr)?;
-                    let row_values = get_row_at_idx(group_values, row)?;
-                    let ordered_columns = self
-                        .aggregation_ordering
-                        .order_indices
-                        .iter()
-                        .map(|idx| row_values[*idx].clone())
-                        .collect::<Vec<_>>();
-                    let group_state = GroupState {
-                        group_by_values: group_rows.row(row).owned(),
-                        aggregation_buffer: vec![
-                            0;
-                            self.row_aggr_layout.fixed_part_width()
-                        ],
-                        accumulator_set,
-                        indices: vec![row as u32], // 1.3
-                    };
-                    let group_idx = group_states.len();
-
-                    // NOTE: do NOT include the `GroupState` struct size in here because this is captured by
-                    // `group_states` (see allocation down below)
-                    *allocated += std::mem::size_of_val(&group_state.group_by_values)
-                        + (std::mem::size_of::<u8>()
-                            * group_state.aggregation_buffer.capacity())
-                        + (std::mem::size_of::<u32>() * group_state.indices.capacity());
-
-                    // Allocation done by normal accumulators
-                    *allocated += (std::mem::size_of::<Box<dyn Accumulator>>()
-                        * group_state.accumulator_set.capacity())
-                        + group_state
-                            .accumulator_set
-                            .iter()
-                            .map(|accu| accu.size())
-                            .sum::<usize>();
-
-                    // for hasher function, use precomputed hash value
-                    map.insert_accounted(
-                        (hash, group_idx),
-                        |(hash, _group_index)| *hash,
-                        allocated,
-                    );
-
-                    // Add new entry to group_states and save newly created index
-                    let ordered_group_state = OrderedGroupState {
-                        group_state,
-                        ordered_columns,
-                        status: GroupStatus::GroupInProgress,
-                        hash,
-                    };
-                    group_states.push_accounted(ordered_group_state, allocated);
-
-                    groups_with_rows.push(group_idx);
-                }
-            };
-        }
-        Ok(groups_with_rows)
-    }
-
-    // Update the accumulator results, according to aggr_state.
-    #[allow(clippy::too_many_arguments)]
-    fn update_accumulators_using_batch(
-        &mut self,
-        groups_with_rows: &[usize],
-        offsets: &[usize],
-        row_values: &[Vec<ArrayRef>],
-        normal_values: &[Vec<ArrayRef>],
-        row_filter_values: &[Option<ArrayRef>],
-        normal_filter_values: &[Option<ArrayRef>],
-        allocated: &mut usize,
-    ) -> Result<()> {
-        // 2.1 for each key in this batch
-        // 2.2 for each aggregation
-        // 2.3 `slice` from each of its arrays the keys' values
-        // 2.4 update / merge the accumulator with the values
-        // 2.5 clear indices
-        groups_with_rows
-            .iter()
-            .zip(offsets.windows(2))
-            .try_for_each(|(group_idx, offsets)| {
-                let group_state =
-                    &mut self.aggr_state.ordered_group_states[*group_idx].group_state;
-                // 2.2
-                // Process row accumulators
-                self.row_accumulators
-                    .iter_mut()
-                    .zip(row_values.iter())
-                    .zip(row_filter_values.iter())
-                    .try_for_each(|((accumulator, aggr_array), filter_opt)| {
-                        let values = slice_and_maybe_filter(
-                            aggr_array,
-                            filter_opt.as_ref(),
-                            offsets,
-                        )?;
-                        let mut state_accessor =
-                            RowAccessor::new_from_layout(self.row_aggr_layout.clone());
-                        state_accessor
-                            .point_to(0, group_state.aggregation_buffer.as_mut_slice());
-                        match self.mode {
-                            AggregateMode::Partial
-                            | AggregateMode::Single
-                            | AggregateMode::SinglePartitioned => {
-                                accumulator.update_batch(&values, &mut state_accessor)
-                            }
-                            AggregateMode::FinalPartitioned | AggregateMode::Final => {
-                                // note: the aggregation here is over states, not values, thus the merge
-                                accumulator.merge_batch(&values, &mut state_accessor)
-                            }
-                        }
-                    })?;
-                // normal accumulators
-                group_state
-                    .accumulator_set
-                    .iter_mut()
-                    .zip(normal_values.iter())
-                    .zip(normal_filter_values.iter())
-                    .try_for_each(|((accumulator, aggr_array), filter_opt)| {
-                        let values = slice_and_maybe_filter(
-                            aggr_array,
-                            filter_opt.as_ref(),
-                            offsets,
-                        )?;
-                        let size_pre = accumulator.size();
-                        let res = match self.mode {
-                            AggregateMode::Partial
-                            | AggregateMode::Single
-                            | AggregateMode::SinglePartitioned => {
-                                accumulator.update_batch(&values)
-                            }
-                            AggregateMode::FinalPartitioned | AggregateMode::Final => {
-                                // note: the aggregation here is over states, not values, thus the merge
-                                accumulator.merge_batch(&values)
-                            }
-                        };
-                        let size_post = accumulator.size();
-                        *allocated += size_post.saturating_sub(size_pre);
-                        res
-                    })
-                    // 2.5
-                    .and({
-                        group_state.indices.clear();
-                        Ok(())
-                    })
-            })?;
-        Ok(())
-    }
-
-    // Update the accumulator results, according to aggr_state.
-    fn update_accumulators_using_scalar(
-        &mut self,
-        groups_with_rows: &[usize],
-        row_values: &[Vec<ArrayRef>],
-        row_filter_values: &[Option<ArrayRef>],
-    ) -> Result<()> {
-        let filter_bool_array = row_filter_values
-            .iter()
-            .map(|filter_opt| match filter_opt {
-                Some(f) => Ok(Some(as_boolean_array(f)?)),
-                None => Ok(None),
-            })
-            .collect::<Result<Vec<_>>>()?;
-
-        for group_idx in groups_with_rows {
-            let group_state =
-                &mut self.aggr_state.ordered_group_states[*group_idx].group_state;
-            let mut state_accessor =
-                RowAccessor::new_from_layout(self.row_aggr_layout.clone());
-            state_accessor.point_to(0, group_state.aggregation_buffer.as_mut_slice());
-            for idx in &group_state.indices {
-                for (accumulator, values_array, filter_array) in izip!(
-                    self.row_accumulators.iter_mut(),
-                    row_values.iter(),
-                    filter_bool_array.iter()
-                ) {
-                    if values_array.len() == 1 {
-                        let scalar_value =
-                            col_to_scalar(&values_array[0], filter_array, *idx as usize)?;
-                        accumulator.update_scalar(&scalar_value, &mut state_accessor)?;
-                    } else {
-                        let scalar_values = values_array
-                            .iter()
-                            .map(|array| {
-                                col_to_scalar(array, filter_array, *idx as usize)
-                            })
-                            .collect::<Result<Vec<_>>>()?;
-                        accumulator
-                            .update_scalar_values(&scalar_values, &mut state_accessor)?;
-                    }
-                }
-            }
-            // clear the group indices in this group
-            group_state.indices.clear();
-        }
-
-        Ok(())
-    }
-
-    /// Perform group-by aggregation for the given [`RecordBatch`].
-    ///
-    /// If successful, this returns the additional number of bytes that were allocated during this process.
-    ///
-    fn group_aggregate_batch(&mut self, batch: RecordBatch) -> Result<usize> {
-        // Evaluate the grouping expressions:
-        let group_by_values = evaluate_group_by(&self.group_by, &batch)?;
-        // Keep track of memory allocated:
-        let mut allocated = 0usize;
-
-        // Evaluate the aggregation expressions.
-        // We could evaluate them after the `take`, but since we need to evaluate all
-        // of them anyways, it is more performant to do it while they are together.
-        let row_aggr_input_values =
-            evaluate_many(&self.row_aggregate_expressions, &batch)?;
-        let normal_aggr_input_values =
-            evaluate_many(&self.normal_aggregate_expressions, &batch)?;
-        let row_filter_values = evaluate_optional(&self.row_filter_expressions, &batch)?;
-        let normal_filter_values =
-            evaluate_optional(&self.normal_filter_expressions, &batch)?;
-
-        let row_converter_size_pre = self.row_converter.size();
-        for group_values in &group_by_values {
-            // If the input is fully sorted on its grouping keys
-            let groups_with_rows = if let AggregationOrdering {
-                mode: GroupByOrderMode::FullyOrdered,
-                order_indices,
-                ordering,
-            } = &self.aggregation_ordering
-            {
-                let group_rows = self.row_converter.convert_columns(group_values)?;
-                let n_rows = group_rows.num_rows();
-                // 1.1 Calculate the group keys for the group values
-                let mut batch_hashes = vec![0; n_rows];
-                create_hashes(group_values, &self.random_state, &mut batch_hashes)?;
-                let sort_column = order_indices
-                    .iter()
-                    .enumerate()
-                    .map(|(idx, cur_idx)| SortColumn {
-                        values: group_values[*cur_idx].clone(),
-                        options: Some(ordering[idx].options),
-                    })
-                    .collect::<Vec<_>>();
-                let n_rows = group_rows.num_rows();
-                // determine the boundaries between groups
-                let ranges = evaluate_partition_ranges(n_rows, &sort_column)?;
-                let per_group_order_info = ranges
-                    .into_iter()
-                    .map(|range| GroupOrderInfo {
-                        owned_row: group_rows.row(range.start).owned(),
-                        hash: batch_hashes[range.start],
-                        range,
-                    })
-                    .collect::<Vec<_>>();
-                self.update_fully_ordered_group_state(
-                    group_values,
-                    per_group_order_info,
-                    &mut allocated,
-                )?
-            } else {
-                self.update_group_state(group_values, &mut allocated)?
-            };
-
-            // Decide the accumulators update mode, use scalar value to update the accumulators when all of the conditions are meet:
-            // 1) The aggregation mode is Partial or Single
-            // 2) There is not normal aggregation expressions
-            // 3) The number of affected groups is high (entries in `aggr_state` have rows need to update). Usually the high cardinality case
-            if matches!(self.mode, AggregateMode::Partial | AggregateMode::Single)
-                && normal_aggr_input_values.is_empty()
-                && normal_filter_values.is_empty()
-                && groups_with_rows.len() >= batch.num_rows() / self.scalar_update_factor
-            {
-                self.update_accumulators_using_scalar(
-                    &groups_with_rows,
-                    &row_aggr_input_values,
-                    &row_filter_values,
-                )?;
-            } else {
-                // Collect all indices + offsets based on keys in this vec
-                let mut batch_indices = UInt32Builder::with_capacity(0);
-                let mut offsets = vec![0];
-                let mut offset_so_far = 0;
-                for &group_idx in groups_with_rows.iter() {
-                    let indices = &self.aggr_state.ordered_group_states[group_idx]
-                        .group_state
-                        .indices;
-                    batch_indices.append_slice(indices);
-                    offset_so_far += indices.len();
-                    offsets.push(offset_so_far);
-                }
-                let batch_indices = batch_indices.finish();
-
-                let row_filter_values =
-                    get_optional_filters(&row_filter_values, &batch_indices);
-                let normal_filter_values =
-                    get_optional_filters(&normal_filter_values, &batch_indices);
-                if self.aggregation_ordering.mode == GroupByOrderMode::FullyOrdered {
-                    self.update_accumulators_using_batch(
-                        &groups_with_rows,
-                        &offsets,
-                        &row_aggr_input_values,
-                        &normal_aggr_input_values,
-                        &row_filter_values,
-                        &normal_filter_values,
-                        &mut allocated,
-                    )?;
-                } else {
-                    let row_values =
-                        get_at_indices(&row_aggr_input_values, &batch_indices)?;
-                    let normal_values =
-                        get_at_indices(&normal_aggr_input_values, &batch_indices)?;
-                    self.update_accumulators_using_batch(
-                        &groups_with_rows,
-                        &offsets,
-                        &row_values,
-                        &normal_values,
-                        &row_filter_values,
-                        &normal_filter_values,
-                        &mut allocated,
-                    )?;
-                };
-            }
-        }
-        allocated += self
-            .row_converter
-            .size()
-            .saturating_sub(row_converter_size_pre);
-
-        let mut new_result = false;
-        let last_ordered_columns = self
-            .aggr_state
-            .ordered_group_states
-            .last()
-            .map(|item| item.ordered_columns.clone());
-
-        if let Some(last_ordered_columns) = last_ordered_columns {
-            for cur_group in &mut self.aggr_state.ordered_group_states {
-                if cur_group.ordered_columns != last_ordered_columns {
-                    // We will no longer receive value. Set status to GroupStatus::CanEmit
-                    // meaning we can generate result for this group.
-                    cur_group.status = GroupStatus::CanEmit;
-                    new_result = true;
-                }
-            }
-        }
-        if new_result {
-            self.exec_state = ExecutionState::ProducingOutput;
-        }
-
-        Ok(allocated)
-    }
-}
-
-/// Tracks the state of the ordered grouping
-#[derive(Debug, PartialEq)]
-enum GroupStatus {
-    /// Data for current group is not complete, and new data may yet
-    /// arrive.
-    GroupInProgress,
-    /// Data for current group is completed, and its result can emitted.
-    CanEmit,
-    /// Result for the groups has been successfully emitted, and group
-    /// state can be pruned.
-    Emitted,
-}
-
-/// Information about the order of the state that is built for each
-/// output group.
-#[derive(Debug)]
-pub struct OrderedGroupState {
-    /// Aggregate values
-    group_state: GroupState,
-    /// The actual value of the ordered columns for this group
-    ordered_columns: Vec<ScalarValue>,
-    /// Can we emit this group?
-    status: GroupStatus,
-    /// Hash value of the group
-    hash: u64,
-}
-
-/// The state of all the groups
-pub struct AggregationState {
-    pub reservation: MemoryReservation,
-
-    /// Logically maps group values to an index in `group_states`
-    ///
-    /// Uses the raw API of hashbrown to avoid actually storing the
-    /// keys in the table
-    ///
-    /// keys: u64 hashes of the GroupValue
-    /// values: (hash, index into `group_states`)
-    pub map: RawTable<(u64, usize)>,
-
-    /// State for each group
-    pub ordered_group_states: Vec<OrderedGroupState>,
-}
-
-impl std::fmt::Debug for AggregationState {
-    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-        // hashes are not store inline, so could only get values
-        let map_string = "RawTable";
-        f.debug_struct("AggregationState")
-            .field("map", &map_string)
-            .field("ordered_group_states", &self.ordered_group_states)
-            .finish()
-    }
-}
-
-impl BoundedAggregateStream {
-    /// Prune the groups from `[Self::ordered_group_states]` which are in
-    /// [`GroupStatus::Emitted`].
-    ///
-    /// Emitted means that the result of this group has already been
-    /// emitted, and we are sure that these groups can not receive new
-    /// rows.
-    fn prune(&mut self) {
-        // clear out emitted groups
-        let n_partition = self.aggr_state.ordered_group_states.len();
-        self.aggr_state
-            .ordered_group_states
-            .retain(|elem| elem.status != GroupStatus::Emitted);
-
-        let n_partition_new = self.aggr_state.ordered_group_states.len();
-        let n_pruned = n_partition - n_partition_new;
-
-        // update hash table with the new indexes of the remaining groups
-        self.aggr_state.map.clear();
-        for (idx, item) in self.aggr_state.ordered_group_states.iter().enumerate() {
-            self.aggr_state
-                .map
-                .insert(item.hash, (item.hash, idx), |(hash, _)| *hash);
-        }
-        self.row_group_skip_position -= n_pruned;
-    }
-
-    /// Create a RecordBatch with all group keys and accumulator' states or values.
-    fn create_batch_from_map(&mut self) -> Result<Option<RecordBatch>> {
-        let skip_items = self.row_group_skip_position;
-        if skip_items > self.aggr_state.ordered_group_states.len() || self.is_end {
-            return Ok(None);
-        }
-        self.is_end |= skip_items == self.aggr_state.ordered_group_states.len();
-        if self.aggr_state.ordered_group_states.is_empty() {
-            let schema = self.schema.clone();
-            return Ok(Some(RecordBatch::new_empty(schema)));
-        }
-
-        let end_idx = min(
-            skip_items + self.batch_size,
-            self.aggr_state.ordered_group_states.len(),
-        );
-        let group_state_chunk =
-            &self.aggr_state.ordered_group_states[skip_items..end_idx];
-
-        // Consider only the groups that can be emitted. (The ones we
-        // are sure that will not receive new entry.)
-        let group_state_chunk = group_state_chunk
-            .iter()
-            .filter(|item| item.status == GroupStatus::CanEmit)
-            .collect::<Vec<_>>();
-
-        if group_state_chunk.is_empty() {
-            let schema = self.schema.clone();
-            return Ok(Some(RecordBatch::new_empty(schema)));
-        }
-
-        // Buffers for each distinct group (i.e. row accumulator memories)
-        let mut state_buffers = group_state_chunk
-            .iter()
-            .map(|gs| gs.group_state.aggregation_buffer.clone())
-            .collect::<Vec<_>>();
-
-        let output_fields = self.schema.fields();
-        // Store row accumulator results (either final output or intermediate state):
-        let row_columns = match self.mode {
-            AggregateMode::Partial => {
-                read_as_batch(&state_buffers, &self.row_aggr_schema)
-            }
-            AggregateMode::Final
-            | AggregateMode::FinalPartitioned
-            | AggregateMode::Single
-            | AggregateMode::SinglePartitioned => {
-                let mut results = vec![];
-                for (idx, acc) in self.row_accumulators.iter().enumerate() {
-                    let mut state_accessor = RowAccessor::new(&self.row_aggr_schema);
-                    let current = state_buffers
-                        .iter_mut()
-                        .map(|buffer| {
-                            state_accessor.point_to(0, buffer);
-                            acc.evaluate(&state_accessor)
-                        })
-                        .collect::<Result<Vec<_>>>()?;
-                    // Get corresponding field for row accumulator
-                    let field = &output_fields[self.indices[1][idx].start];
-                    let result = if current.is_empty() {
-                        Ok(arrow::array::new_empty_array(field.data_type()))
-                    } else {
-                        let item = ScalarValue::iter_to_array(current)?;
-                        // cast output if needed (e.g. for types like Dictionary where
-                        // the intermediate GroupByScalar type was not the same as the
-                        // output
-                        cast(&item, field.data_type())
-                    }?;
-                    results.push(result);
-                }
-                results
-            }
-        };
-
-        // Store normal accumulator results (either final output or intermediate state):
-        let mut columns = vec![];
-        for (idx, &Range { start, end }) in self.indices[0].iter().enumerate() {
-            for (field_idx, field) in output_fields[start..end].iter().enumerate() {
-                let current = match self.mode {
-                    AggregateMode::Partial => ScalarValue::iter_to_array(
-                        group_state_chunk.iter().map(|group_state| {
-                            group_state.group_state.accumulator_set[idx]
-                                .state()
-                                .map(|v| v[field_idx].clone())
-                                .expect("Unexpected accumulator state in hash aggregate")
-                        }),
-                    ),
-                    AggregateMode::Final
-                    | AggregateMode::FinalPartitioned
-                    | AggregateMode::Single
-                    | AggregateMode::SinglePartitioned => ScalarValue::iter_to_array(
-                        group_state_chunk.iter().map(|group_state| {
-                            group_state.group_state.accumulator_set[idx]
-                                .evaluate()
-                                .expect("Unexpected accumulator state in hash aggregate")
-                        }),
-                    ),
-                }?;
-                // Cast output if needed (e.g. for types like Dictionary where
-                // the intermediate GroupByScalar type was not the same as the
-                // output
-                let result = cast(&current, field.data_type())?;
-                columns.push(result);
-            }
-        }
-
-        // Stores the group by fields
-        let group_buffers = group_state_chunk
-            .iter()
-            .map(|gs| gs.group_state.group_by_values.row())
-            .collect::<Vec<_>>();
-        let mut output: Vec<ArrayRef> = self.row_converter.convert_rows(group_buffers)?;
-
-        // The size of the place occupied by row and normal accumulators
-        let extra: usize = self
-            .indices
-            .iter()
-            .flatten()
-            .map(|Range { start, end }| end - start)
-            .sum();
-        let empty_arr = new_null_array(&DataType::Null, 1);
-        output.extend(std::iter::repeat(empty_arr).take(extra));
-
-        // Write results of both accumulator types to the corresponding location in
-        // the output schema:
-        let results = [columns.into_iter(), row_columns.into_iter()];
-        for (outer, mut current) in results.into_iter().enumerate() {
-            for &Range { start, end } in self.indices[outer].iter() {
-                for item in output.iter_mut().take(end).skip(start) {
-                    *item = current.next().expect("Columns cannot be empty");
-                }
-            }
-        }
-
-        // Set status of the emitted groups to GroupStatus::Emitted mode.
-        for gs in self.aggr_state.ordered_group_states[skip_items..end_idx].iter_mut() {
-            if gs.status == GroupStatus::CanEmit {
-                gs.status = GroupStatus::Emitted;
-            }
-        }
-
-        Ok(Some(RecordBatch::try_new(self.schema.clone(), output)?))
-    }
-}
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs
index b406320dda..9d8ced18da 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -18,8 +18,7 @@
 //! Aggregates functionalities
 
 use crate::physical_plan::aggregates::{
-    bounded_aggregate_stream::BoundedAggregateStream, no_grouping::AggregateStream,
-    row_hash::GroupedHashAggregateStream,
+    no_grouping::AggregateStream, row_hash::GroupedHashAggregateStream,
 };
 use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
 use crate::physical_plan::{
@@ -46,10 +45,9 @@ use std::any::Any;
 use std::collections::HashMap;
 use std::sync::Arc;
 
-mod bounded_aggregate_stream;
 mod no_grouping;
+mod order;
 mod row_hash;
-mod utils;
 
 pub use datafusion_expr::AggregateFunction;
 use datafusion_physical_expr::aggregate::is_order_sensitive;
@@ -95,7 +93,7 @@ pub enum AggregateMode {
 /// Specifically, each distinct combination of the relevant columns
 /// are contiguous in the input, and once a new combination is seen
 /// previous combinations are guaranteed never to appear again
-#[derive(Debug, Clone, PartialEq, Eq)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 pub enum GroupByOrderMode {
     /// The input is not (known to be) ordered by any of the
     /// expressions in the GROUP BY clause.
@@ -218,7 +216,6 @@ impl PartialEq for PhysicalGroupBy {
 enum StreamType {
     AggregateStream(AggregateStream),
     GroupedHashAggregateStream(GroupedHashAggregateStream),
-    BoundedAggregate(BoundedAggregateStream),
 }
 
 impl From<StreamType> for SendableRecordBatchStream {
@@ -226,7 +223,6 @@ impl From<StreamType> for SendableRecordBatchStream {
         match stream {
             StreamType::AggregateStream(stream) => Box::pin(stream),
             StreamType::GroupedHashAggregateStream(stream) => Box::pin(stream),
-            StreamType::BoundedAggregate(stream) => Box::pin(stream),
         }
     }
 }
@@ -725,14 +721,6 @@ impl AggregateExec {
             Ok(StreamType::AggregateStream(AggregateStream::new(
                 self, context, partition,
             )?))
-        } else if let Some(aggregation_ordering) = &self.aggregation_ordering {
-            let aggregation_ordering = aggregation_ordering.clone();
-            Ok(StreamType::BoundedAggregate(BoundedAggregateStream::new(
-                self,
-                context,
-                partition,
-                aggregation_ordering,
-            )?))
         } else {
             Ok(StreamType::GroupedHashAggregateStream(
                 GroupedHashAggregateStream::new(self, context, partition)?,
@@ -1116,6 +1104,7 @@ fn create_accumulators(
         .collect::<Result<Vec<_>>>()
 }
 
+#[allow(dead_code)]
 fn create_row_accumulators(
     aggr_expr: &[Arc<dyn AggregateExpr>],
 ) -> Result<Vec<RowAccumulatorItem>> {
diff --git a/datafusion/core/src/physical_plan/aggregates/order/full.rs b/datafusion/core/src/physical_plan/aggregates/order/full.rs
new file mode 100644
index 0000000000..d95433a998
--- /dev/null
+++ b/datafusion/core/src/physical_plan/aggregates/order/full.rs
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_execution::memory_pool::proxy::VecAllocExt;
+
+use crate::physical_expr::EmitTo;
+
+/// Tracks grouping state when the data is ordered entirely by its
+/// group keys
+///
+/// When the group values are sorted, as soon as we see group `n+1` we
+/// know we will never see any rows for group `n again and thus they
+/// can be emitted.
+///
+/// For example, given `SUM(amt) GROUP BY id` if the input is sorted
+/// by `id` as soon as a new `id` value is seen all previous values
+/// can be emitted.
+///
+/// The state is tracked like this:
+///
+/// ```text
+///      ┌─────┐   ┌──────────────────┐
+///      │┌───┐│   │ ┌──────────────┐ │         ┏━━━━━━━━━━━━━━┓
+///      ││ 0 ││   │ │     123      │ │   ┌─────┃      13      ┃
+///      │└───┘│   │ └──────────────┘ │   │     ┗━━━━━━━━━━━━━━┛
+///      │ ... │   │    ...           │   │
+///      │┌───┐│   │ ┌──────────────┐ │   │         current
+///      ││12 ││   │ │     234      │ │   │
+///      │├───┤│   │ ├──────────────┤ │   │
+///      ││12 ││   │ │     234      │ │   │
+///      │├───┤│   │ ├──────────────┤ │   │
+///      ││13 ││   │ │     456      │◀┼───┘
+///      │└───┘│   │ └──────────────┘ │
+///      └─────┘   └──────────────────┘
+///
+///  group indices    group_values        current tracks the most
+/// (in group value                          recent group index
+///      order)
+/// ```
+///
+/// In this diagram, the current group is `13`, and thus groups
+/// `0..12` can be emitted. Note that `13` can not yet be emitted as
+/// there may be more values in the next batch with the same group_id.
+#[derive(Debug)]
+pub(crate) struct GroupOrderingFull {
+    state: State,
+    /// Hash values for groups in 0..current
+    hashes: Vec<u64>,
+}
+
+#[derive(Debug)]
+enum State {
+    /// Seen no input yet
+    Start,
+
+    /// Data is in progress. `current is the current group for which
+    /// values are being generated. Can emit `current` - 1
+    InProgress { current: usize },
+
+    /// Seen end of input: all groups can be emitted
+    Complete,
+}
+
+impl GroupOrderingFull {
+    pub fn new() -> Self {
+        Self {
+            state: State::Start,
+            hashes: vec![],
+        }
+    }
+
+    // How many groups be emitted, or None if no data can be emitted
+    pub fn emit_to(&self) -> Option<EmitTo> {
+        match &self.state {
+            State::Start => None,
+            State::InProgress { current, .. } => {
+                if *current == 0 {
+                    // Can not emit if still on the first row
+                    None
+                } else {
+                    // otherwise emit all rows prior to the current group
+                    Some(EmitTo::First(*current))
+                }
+            }
+            State::Complete { .. } => Some(EmitTo::All),
+        }
+    }
+
+    /// remove the first n groups from the internal state, shifting
+    /// all existing indexes down by `n`. Returns stored hash values
+    pub fn remove_groups(&mut self, n: usize) -> &[u64] {
+        match &mut self.state {
+            State::Start => panic!("invalid state: start"),
+            State::InProgress { current } => {
+                // shift down by n
+                assert!(*current >= n);
+                *current -= n;
+                self.hashes.drain(0..n);
+            }
+            State::Complete { .. } => panic!("invalid state: complete"),
+        };
+        &self.hashes
+    }
+
+    /// Note that the input is complete so any outstanding groups are done as well
+    pub fn input_done(&mut self) {
+        self.state = State::Complete;
+    }
+
+    /// Called when new groups are added in a batch. See documentation
+    /// on [`super::GroupOrdering::new_groups`]
+    pub fn new_groups(
+        &mut self,
+        group_indices: &[usize],
+        batch_hashes: &[u64],
+        total_num_groups: usize,
+    ) {
+        assert_ne!(total_num_groups, 0);
+        assert_eq!(group_indices.len(), batch_hashes.len());
+
+        // copy any hash values
+        self.hashes.resize(total_num_groups, 0);
+        for (&group_index, &hash) in group_indices.iter().zip(batch_hashes.iter()) {
+            self.hashes[group_index] = hash;
+        }
+
+        // Update state
+        let max_group_index = total_num_groups - 1;
+        self.state = match self.state {
+            State::Start => State::InProgress {
+                current: max_group_index,
+            },
+            State::InProgress { current } => {
+                // expect to see new group indexes when called again
+                assert!(current <= max_group_index, "{current} <= {max_group_index}");
+                State::InProgress {
+                    current: max_group_index,
+                }
+            }
+            State::Complete { .. } => {
+                panic!("Saw new group after input was complete");
+            }
+        };
+    }
+
+    pub(crate) fn size(&self) -> usize {
+        std::mem::size_of::<Self>() + self.hashes.allocated_size()
+    }
+}
diff --git a/datafusion/core/src/physical_plan/aggregates/order/mod.rs b/datafusion/core/src/physical_plan/aggregates/order/mod.rs
new file mode 100644
index 0000000000..4e1da35319
--- /dev/null
+++ b/datafusion/core/src/physical_plan/aggregates/order/mod.rs
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_array::ArrayRef;
+use arrow_schema::Schema;
+use datafusion_common::Result;
+use datafusion_physical_expr::EmitTo;
+
+use super::{AggregationOrdering, GroupByOrderMode};
+
+mod full;
+mod partial;
+
+pub(crate) use full::GroupOrderingFull;
+pub(crate) use partial::GroupOrderingPartial;
+
+/// Ordering information for each group in the hash table
+#[derive(Debug)]
+pub(crate) enum GroupOrdering {
+    /// Groups are not ordered
+    None,
+    /// Groups are ordered by some pre-set of the group keys
+    Partial(GroupOrderingPartial),
+    /// Groups are entirely contiguous,
+    Full(GroupOrderingFull),
+}
+
+impl GroupOrdering {
+    /// Create a `GroupOrdering` for the the specified ordering
+    pub fn try_new(
+        input_schema: &Schema,
+        ordering: &AggregationOrdering,
+    ) -> Result<Self> {
+        let AggregationOrdering {
+            mode,
+            order_indices,
+            ordering,
+        } = ordering;
+
+        Ok(match mode {
+            GroupByOrderMode::None => GroupOrdering::None,
+            GroupByOrderMode::PartiallyOrdered => {
+                let partial =
+                    GroupOrderingPartial::try_new(input_schema, order_indices, ordering)?;
+                GroupOrdering::Partial(partial)
+            }
+            GroupByOrderMode::FullyOrdered => {
+                GroupOrdering::Full(GroupOrderingFull::new())
+            }
+        })
+    }
+
+    // How many groups be emitted, or None if no data can be emitted
+    pub fn emit_to(&self) -> Option<EmitTo> {
+        match self {
+            GroupOrdering::None => None,
+            GroupOrdering::Partial(partial) => partial.emit_to(),
+            GroupOrdering::Full(full) => full.emit_to(),
+        }
+    }
+
+    /// Updates the state the input is done
+    pub fn input_done(&mut self) {
+        match self {
+            GroupOrdering::None => {}
+            GroupOrdering::Partial(partial) => partial.input_done(),
+            GroupOrdering::Full(full) => full.input_done(),
+        }
+    }
+
+    /// remove the first n groups from the internal state, shifting
+    /// all existing indexes down by `n`. Returns stored hash values
+    pub fn remove_groups(&mut self, n: usize) -> &[u64] {
+        match self {
+            GroupOrdering::None => &[],
+            GroupOrdering::Partial(partial) => partial.remove_groups(n),
+            GroupOrdering::Full(full) => full.remove_groups(n),
+        }
+    }
+
+    /// Called when new groups are added in a batch
+    ///
+    /// * `total_num_groups`: total number of groups (so max
+    /// group_index is total_num_groups - 1).
+    ///
+    /// * `group_values`: group key values for *each row* in the batch
+    ///
+    /// * `group_indices`: indices for each row in the batch
+    ///
+    /// * `hashes`: hash values for each row in the batch
+    pub fn new_groups(
+        &mut self,
+        batch_group_values: &[ArrayRef],
+        group_indices: &[usize],
+        batch_hashes: &[u64],
+        total_num_groups: usize,
+    ) -> Result<()> {
+        match self {
+            GroupOrdering::None => {}
+            GroupOrdering::Partial(partial) => {
+                partial.new_groups(
+                    batch_group_values,
+                    group_indices,
+                    batch_hashes,
+                    total_num_groups,
+                )?;
+            }
+
+            GroupOrdering::Full(full) => {
+                full.new_groups(group_indices, batch_hashes, total_num_groups);
+            }
+        };
+        Ok(())
+    }
+
+    /// Return the size of memory used by the ordering state, in bytes
+    pub(crate) fn size(&self) -> usize {
+        std::mem::size_of::<Self>()
+            + match self {
+                GroupOrdering::None => 0,
+                GroupOrdering::Partial(partial) => partial.size(),
+                GroupOrdering::Full(full) => full.size(),
+            }
+    }
+}
diff --git a/datafusion/core/src/physical_plan/aggregates/order/partial.rs b/datafusion/core/src/physical_plan/aggregates/order/partial.rs
new file mode 100644
index 0000000000..be8cd59671
--- /dev/null
+++ b/datafusion/core/src/physical_plan/aggregates/order/partial.rs
@@ -0,0 +1,267 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::physical_expr::EmitTo;
+use arrow::row::{OwnedRow, RowConverter, Rows, SortField};
+use arrow_array::ArrayRef;
+use arrow_schema::Schema;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::proxy::VecAllocExt;
+use datafusion_physical_expr::PhysicalSortExpr;
+
+/// Tracks grouping state when the data is ordered by some subset of
+/// the group keys.
+///
+/// Once the next *sort key* value is seen, never see groups with that
+/// sort key again, so we can emit all groups with the previous sort
+/// key and earlier.
+///
+/// For example, given `SUM(amt) GROUP BY id, state` if the input is
+/// sorted by `state, when a new value of `state` is seen, all groups
+/// with prior values of `state` can be emitted.
+///
+/// The state is tracked like this:
+///
+/// ```text
+///                                            ┏━━━━━━━━━━━━━━━━━┓ ┏━━━━━━━┓
+///     ┌─────┐    ┌───────────────────┐ ┌─────┃        9        ┃ ┃ "MD"  ┃
+///     │┌───┐│    │ ┌──────────────┐  │ │     ┗━━━━━━━━━━━━━━━━━┛ ┗━━━━━━━┛
+///     ││ 0 ││    │ │  123, "MA"   │  │ │        current_sort      sort_key
+///     │└───┘│    │ └──────────────┘  │ │
+///     │ ... │    │    ...            │ │      current_sort tracks the
+///     │┌───┐│    │ ┌──────────────┐  │ │      smallest group index that had
+///     ││ 8 ││    │ │  765, "MA"   │  │ │      the same sort_key as current
+///     │├───┤│    │ ├──────────────┤  │ │
+///     ││ 9 ││    │ │  923, "MD"   │◀─┼─┘
+///     │├───┤│    │ ├──────────────┤  │        ┏━━━━━━━━━━━━━━┓
+///     ││10 ││    │ │  345, "MD"   │  │  ┌─────┃      11      ┃
+///     │├───┤│    │ ├──────────────┤  │  │     ┗━━━━━━━━━━━━━━┛
+///     ││11 ││    │ │  124, "MD"   │◀─┼──┘         current
+///     │└───┘│    │ └──────────────┘  │
+///     └─────┘    └───────────────────┘
+///
+///  group indices
+/// (in group value  group_values               current tracks the most
+///      order)                                    recent group index
+///```
+#[derive(Debug)]
+pub(crate) struct GroupOrderingPartial {
+    /// State machine
+    state: State,
+
+    /// The indexes of the group by columns that form the sort key.
+    /// For example if grouping by `id, state` and ordered by `state`
+    /// this would be `[1]`.
+    order_indices: Vec<usize>,
+
+    /// Converter for the sort key (used on the group columns
+    /// specified in `order_indexes`)
+    row_converter: RowConverter,
+
+    /// Hash values for groups in 0..completed
+    hashes: Vec<u64>,
+}
+
+#[derive(Debug, Default)]
+enum State {
+    /// The ordering was temporarily taken.  `Self::Taken` is left
+    /// when state must be temporarily taken to satisfy the borrow
+    /// checker. If an error happens before the state can be restored,
+    /// the ordering information is lost and execution can not
+    /// proceed, but there is no undefined behavior.
+    #[default]
+    Taken,
+
+    /// Seen no input yet
+    Start,
+
+    /// Data is in progress.
+    InProgress {
+        /// Smallest group index with the sort_key
+        current_sort: usize,
+        /// The sort key of group_index `current_sort`
+        sort_key: OwnedRow,
+        /// index of the current group for which values are being
+        /// generated
+        current: usize,
+    },
+
+    /// Seen end of input, all groups can be emitted
+    Complete,
+}
+
+impl GroupOrderingPartial {
+    pub fn try_new(
+        input_schema: &Schema,
+        order_indices: &[usize],
+        ordering: &[PhysicalSortExpr],
+    ) -> Result<Self> {
+        assert!(!order_indices.is_empty());
+        assert_eq!(order_indices.len(), ordering.len());
+
+        let fields = ordering
+            .iter()
+            .map(|sort_expr| {
+                Ok(SortField::new_with_options(
+                    sort_expr.expr.data_type(input_schema)?,
+                    sort_expr.options,
+                ))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        Ok(Self {
+            state: State::Start,
+            order_indices: order_indices.to_vec(),
+            row_converter: RowConverter::new(fields)?,
+            hashes: vec![],
+        })
+    }
+
+    /// Creates sort keys from the group values
+    ///
+    /// For example, if group_values had `A, B, C` but the input was
+    /// only sorted on `B` and `C` this should return rows for (`B`,
+    /// `C`)
+    fn compute_sort_keys(&mut self, group_values: &[ArrayRef]) -> Result<Rows> {
+        // Take only the columns that are in the sort key
+        let sort_values: Vec<_> = self
+            .order_indices
+            .iter()
+            .map(|&idx| group_values[idx].clone())
+            .collect();
+
+        Ok(self.row_converter.convert_columns(&sort_values)?)
+    }
+
+    /// How many groups be emitted, or None if no data can be emitted
+    pub fn emit_to(&self) -> Option<EmitTo> {
+        match &self.state {
+            State::Taken => unreachable!("State previously taken"),
+            State::Start => None,
+            State::InProgress { current_sort, .. } => {
+                // Can not emit if we are still on the first row sort
+                // row otherwise we can emit all groups that had earlier sort keys
+                //
+                if *current_sort == 0 {
+                    None
+                } else {
+                    Some(EmitTo::First(*current_sort))
+                }
+            }
+            State::Complete => Some(EmitTo::All),
+        }
+    }
+
+    /// remove the first n groups from the internal state, shifting
+    /// all existing indexes down by `n`. Returns stored hash values
+    pub fn remove_groups(&mut self, n: usize) -> &[u64] {
+        match &mut self.state {
+            State::Taken => unreachable!("State previously taken"),
+            State::Start => panic!("invalid state: start"),
+            State::InProgress {
+                current_sort,
+                current,
+                sort_key: _,
+            } => {
+                // shift indexes down by n
+                assert!(*current >= n);
+                *current -= n;
+                assert!(*current_sort >= n);
+                *current_sort -= n;
+                // Note sort_key stays the same, we are just translating group indexes
+                self.hashes.drain(0..n);
+            }
+            State::Complete { .. } => panic!("invalid state: complete"),
+        };
+        &self.hashes
+    }
+
+    /// Note that the input is complete so any outstanding groups are done as well
+    pub fn input_done(&mut self) {
+        self.state = match self.state {
+            State::Taken => unreachable!("State previously taken"),
+            _ => State::Complete,
+        };
+    }
+
+    /// Called when new groups are added in a batch. See documentation
+    /// on [`super::GroupOrdering::new_groups`]
+    pub fn new_groups(
+        &mut self,
+        batch_group_values: &[ArrayRef],
+        group_indices: &[usize],
+        batch_hashes: &[u64],
+        total_num_groups: usize,
+    ) -> Result<()> {
+        assert!(total_num_groups > 0);
+        assert!(!batch_group_values.is_empty());
+        assert_eq!(group_indices.len(), batch_hashes.len());
+
+        let max_group_index = total_num_groups - 1;
+
+        // compute the sort key values for each group
+        let sort_keys = self.compute_sort_keys(batch_group_values)?;
+        assert_eq!(sort_keys.num_rows(), batch_hashes.len());
+
+        let old_state = std::mem::take(&mut self.state);
+        let (mut current_sort, mut sort_key) = match &old_state {
+            State::Taken => unreachable!("State previously taken"),
+            State::Start => (0, sort_keys.row(0)),
+            State::InProgress {
+                current_sort,
+                sort_key,
+                ..
+            } => (*current_sort, sort_key.row()),
+            State::Complete => {
+                panic!("Saw new group after the end of input");
+            }
+        };
+
+        // copy any hash values, and find latest sort key
+        self.hashes.resize(total_num_groups, 0);
+        let iter = group_indices
+            .iter()
+            .zip(batch_hashes.iter())
+            .zip(sort_keys.iter());
+
+        for ((&group_index, &hash), group_sort_key) in iter {
+            self.hashes[group_index] = hash;
+
+            // Does this group have seen a new sort_key?
+            if sort_key != group_sort_key {
+                current_sort = group_index;
+                sort_key = group_sort_key;
+            }
+        }
+
+        self.state = State::InProgress {
+            current_sort,
+            sort_key: sort_key.owned(),
+            current: max_group_index,
+        };
+
+        Ok(())
+    }
+
+    /// Return the size of memor allocated by this structure
+    pub(crate) fn size(&self) -> usize {
+        std::mem::size_of::<Self>()
+            + self.order_indices.allocated_size()
+            + self.row_converter.size()
+            + self.hashes.allocated_size()
+    }
+}
diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
index c57f436324..b48e8f38e9 100644
--- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs
+++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
@@ -15,12 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Hash aggregation through row format
-//!
-//! POC demonstration of GroupByHashApproach
+//! Hash aggregation
 
 use datafusion_physical_expr::{
-    AggregateExpr, GroupsAccumulator, GroupsAccumulatorAdapter,
+    AggregateExpr, EmitTo, GroupsAccumulator, GroupsAccumulatorAdapter,
 };
 use log::debug;
 use std::sync::Arc;
@@ -44,7 +42,7 @@ use arrow::array::*;
 use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
 use datafusion_common::Result;
 use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
-use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
+use datafusion_execution::memory_pool::{MemoryConsumer, MemoryDelta, MemoryReservation};
 use datafusion_execution::TaskContext;
 use hashbrown::raw::RawTable;
 
@@ -58,6 +56,7 @@ pub(crate) enum ExecutionState {
     Done,
 }
 
+use super::order::GroupOrdering;
 use super::AggregateExec;
 
 /// Hash based Grouping Aggregator
@@ -199,6 +198,14 @@ pub(crate) struct GroupedHashAggregateStream {
 
     /// max rows in output RecordBatches
     batch_size: usize,
+
+    /// Optional ordering information, that might allow groups to be
+    /// emitted from the hash table prior to seeing the end of the
+    /// input
+    group_ordering: GroupOrdering,
+
+    /// Have we seen the end of the input
+    input_done: bool,
 }
 
 impl GroupedHashAggregateStream {
@@ -258,6 +265,16 @@ impl GroupedHashAggregateStream {
         let map = RawTable::with_capacity(0);
         let group_values = row_converter.empty_rows(0, 0);
 
+        let group_ordering = agg
+            .aggregation_ordering
+            .as_ref()
+            .map(|aggregation_ordering| {
+                GroupOrdering::try_new(&group_schema, aggregation_ordering)
+            })
+            // return error if any
+            .transpose()?
+            .unwrap_or(GroupOrdering::None);
+
         timer.done();
 
         let exec_state = ExecutionState::ReadingInput;
@@ -279,6 +296,8 @@ impl GroupedHashAggregateStream {
             baseline_metrics,
             random_state: Default::default(),
             batch_size,
+            group_ordering,
+            input_done: false,
         })
     }
 }
@@ -303,6 +322,16 @@ fn create_group_accumulator(
     }
 }
 
+/// Extracts a successful Ok(_) or returns Poll::Ready(Some(Err(e))) with errors
+macro_rules! extract_ok {
+    ($RES: expr) => {{
+        match $RES {
+            Ok(v) => v,
+            Err(e) => return Poll::Ready(Some(Err(e))),
+        }
+    }};
+}
+
 impl Stream for GroupedHashAggregateStream {
     type Item = Result<RecordBatch>;
 
@@ -320,36 +349,33 @@ impl Stream for GroupedHashAggregateStream {
                         // new batch to aggregate
                         Some(Ok(batch)) => {
                             let timer = elapsed_compute.timer();
-                            let result = self.group_aggregate_batch(batch);
-                            timer.done();
-
-                            // allocate memory AFTER we actually used
-                            // the memory, which simplifies the whole
-                            // accounting and we are OK with
-                            // overshooting a bit.
-                            //
-                            // Also this means we either store the
-                            // whole record batch or not.
-                            let result = result.and_then(|allocated| {
-                                self.reservation.try_grow(allocated)
-                            });
-
-                            if let Err(e) = result {
-                                return Poll::Ready(Some(Err(e)));
+                            // Do the grouping
+                            extract_ok!(self.group_aggregate_batch(batch));
+
+                            // If we can begin emitting rows, do so,
+                            // otherwise keep consuming input
+                            assert!(!self.input_done);
+                            let to_emit = self.group_ordering.emit_to();
+
+                            if let Some(to_emit) = to_emit {
+                                let batch =
+                                    extract_ok!(self.create_batch_from_map(to_emit));
+                                self.exec_state = ExecutionState::ProducingOutput(batch);
                             }
+                            timer.done();
+                        }
+                        Some(Err(e)) => {
+                            // inner had error, return to caller
+                            return Poll::Ready(Some(Err(e)));
                         }
-                        // inner had error, return to caller
-                        Some(Err(e)) => return Poll::Ready(Some(Err(e))),
-                        // inner is done, producing output
                         None => {
+                            // inner is done, emit all rows and switch to producing output
+                            self.input_done = true;
+                            self.group_ordering.input_done();
                             let timer = elapsed_compute.timer();
-                            match self.create_batch_from_map() {
-                                Ok(batch) => {
-                                    self.exec_state =
-                                        ExecutionState::ProducingOutput(batch)
-                                }
-                                Err(e) => return Poll::Ready(Some(Err(e))),
-                            }
+                            let batch =
+                                extract_ok!(self.create_batch_from_map(EmitTo::All));
+                            self.exec_state = ExecutionState::ProducingOutput(batch);
                             timer.done();
                         }
                     }
@@ -358,7 +384,11 @@ impl Stream for GroupedHashAggregateStream {
                 ExecutionState::ProducingOutput(batch) => {
                     // slice off a part of the batch, if needed
                     let output_batch = if batch.num_rows() <= self.batch_size {
-                        self.exec_state = ExecutionState::Done;
+                        if self.input_done {
+                            self.exec_state = ExecutionState::Done;
+                        } else {
+                            self.exec_state = ExecutionState::ReadingInput
+                        }
                         batch
                     } else {
                         // output first batch_size rows
@@ -397,7 +427,7 @@ impl GroupedHashAggregateStream {
     fn update_group_state(
         &mut self,
         group_values: &[ArrayRef],
-        allocated: &mut usize,
+        memory_delta: &mut MemoryDelta,
     ) -> Result<()> {
         // Convert the group keys into the row format
         // Avoid reallocation when https://github.com/apache/arrow-rs/issues/4479 is available
@@ -405,8 +435,7 @@ impl GroupedHashAggregateStream {
         let n_rows = group_rows.num_rows();
 
         // track memory used
-        let group_values_size_pre = self.group_values.size();
-        let scratch_size_pre = self.scratch_space.size();
+        memory_delta.dec(self.state_size());
 
         // tracks to which group each of the input rows belongs
         let group_indices = &mut self.scratch_space.current_group_indices;
@@ -418,6 +447,8 @@ impl GroupedHashAggregateStream {
         batch_hashes.resize(n_rows, 0);
         create_hashes(group_values, &self.random_state, batch_hashes)?;
 
+        let mut allocated = 0;
+        let starting_num_groups = self.group_values.num_rows();
         for (row, &hash) in batch_hashes.iter().enumerate() {
             let entry = self.map.get_mut(hash, |(_hash, group_idx)| {
                 // verify that a group that we are inserting with hash is
@@ -439,35 +470,40 @@ impl GroupedHashAggregateStream {
                     self.map.insert_accounted(
                         (hash, group_idx),
                         |(hash, _group_index)| *hash,
-                        allocated,
+                        &mut allocated,
                     );
                     group_idx
                 }
             };
             group_indices.push(group_idx);
         }
+        memory_delta.inc(allocated);
+
+        // Update ordering information if necessary
+        let total_num_groups = self.group_values.num_rows();
+        if total_num_groups > starting_num_groups {
+            self.group_ordering.new_groups(
+                group_values,
+                group_indices,
+                batch_hashes,
+                total_num_groups,
+            )?;
+        }
 
-        // account for memory growth in scratch space
-        *allocated += self.scratch_space.size();
-        *allocated -= scratch_size_pre; // subtract after adding to avoid underflow
-
-        // account for any memory increase used to store group_values
-        *allocated += self.group_values.size();
-        *allocated -= group_values_size_pre; // subtract after adding to avoid underflow
+        // account for memory change
+        memory_delta.inc(self.state_size());
 
         Ok(())
     }
 
     /// Perform group-by aggregation for the given [`RecordBatch`].
-    ///
-    /// If successful, returns the additional amount of memory, in
-    /// bytes, that were allocated during this process.
-    fn group_aggregate_batch(&mut self, batch: RecordBatch) -> Result<usize> {
+    fn group_aggregate_batch(&mut self, batch: RecordBatch) -> Result<()> {
         // Evaluate the grouping expressions
         let group_by_values = evaluate_group_by(&self.group_by, &batch)?;
 
         // Keep track of memory allocated:
-        let mut allocated = 0usize;
+        let mut memory_delta = MemoryDelta::new();
+        memory_delta.dec(self.state_size());
 
         // Evaluate the aggregation expressions.
         let input_values = evaluate_many(&self.aggregate_arguments, &batch)?;
@@ -475,11 +511,9 @@ impl GroupedHashAggregateStream {
         // Evaluate the filter expressions, if any, against the inputs
         let filter_values = evaluate_optional(&self.filter_expressions, &batch)?;
 
-        let row_converter_size_pre = self.row_converter.size();
-
         for group_values in &group_by_values {
             // calculate the group indices for each input row
-            self.update_group_state(group_values, &mut allocated)?;
+            self.update_group_state(group_values, &mut memory_delta)?;
             let group_indices = &self.scratch_space.current_group_indices;
 
             // Gather the inputs to call the actual accumulator
@@ -492,7 +526,7 @@ impl GroupedHashAggregateStream {
             let total_num_groups = self.group_values.num_rows();
 
             for ((acc, values), opt_filter) in t {
-                let acc_size_pre = acc.size();
+                memory_delta.dec(acc.size());
                 let opt_filter = opt_filter.as_ref().map(|filter| filter.as_boolean());
 
                 // Call the appropriate method on each aggregator with
@@ -519,40 +553,100 @@ impl GroupedHashAggregateStream {
                         )?;
                     }
                 }
-
-                allocated += acc.size();
-                allocated -= acc_size_pre;
+                memory_delta.inc(acc.size());
             }
         }
-        allocated += self.row_converter.size();
-        allocated -= row_converter_size_pre;
+        memory_delta.inc(self.state_size());
 
-        Ok(allocated)
+        // Update allocation AFTER it is used, simplifying accounting,
+        // though it results in a temporary overshoot.
+        memory_delta.update(&mut self.reservation)
     }
 
-    /// Create an output RecordBatch with all group keys and accumulator states/values
-    fn create_batch_from_map(&mut self) -> Result<RecordBatch> {
+    /// Create an output RecordBatch with the group keys and
+    /// accumulator states/values specified in emit_to
+    fn create_batch_from_map(&mut self, emit_to: EmitTo) -> Result<RecordBatch> {
         if self.group_values.num_rows() == 0 {
-            let schema = self.schema.clone();
-            return Ok(RecordBatch::new_empty(schema));
+            return Ok(RecordBatch::new_empty(self.schema()));
         }
 
+        let output = self.build_output(emit_to)?;
+        self.remove_emitted(emit_to)?;
+        let batch = RecordBatch::try_new(self.schema(), output)?;
+        Ok(batch)
+    }
+
+    /// Creates output: `(group 1, group 2, ... agg 1, agg 2, ...)`
+    fn build_output(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
         // First output rows are the groups
-        let groups_rows = self.group_values.iter();
-        let mut output: Vec<ArrayRef> = self.row_converter.convert_rows(groups_rows)?;
+        let mut output: Vec<ArrayRef> = match emit_to {
+            EmitTo::All => {
+                let groups_rows = self.group_values.iter();
+                self.row_converter.convert_rows(groups_rows)?
+            }
+            EmitTo::First(n) => {
+                let groups_rows = self.group_values.iter().take(n);
+                self.row_converter.convert_rows(groups_rows)?
+            }
+        };
 
-        // Next output each aggregate value, from the accumulators
+        // Next output each aggregate value
         for acc in self.accumulators.iter_mut() {
             match self.mode {
-                AggregateMode::Partial => output.extend(acc.state()?),
+                AggregateMode::Partial => output.extend(acc.state(emit_to)?),
                 AggregateMode::Final
                 | AggregateMode::FinalPartitioned
                 | AggregateMode::Single
-                | AggregateMode::SinglePartitioned => output.push(acc.evaluate()?),
+                | AggregateMode::SinglePartitioned => output.push(acc.evaluate(emit_to)?),
             }
         }
 
-        Ok(RecordBatch::try_new(self.schema.clone(), output)?)
+        Ok(output)
+    }
+
+    /// Removes the first `n` groups, adjusting all group_indices
+    /// appropriately
+    fn remove_emitted(&mut self, emit_to: EmitTo) -> Result<()> {
+        let mut memory_delta = MemoryDelta::new();
+        memory_delta.dec(self.state_size());
+
+        match emit_to {
+            EmitTo::All => {
+                // Eventually we may also want to clear the hash table here
+                //self.map.clear();
+            }
+            EmitTo::First(n) => {
+                // Clear out first n group keys by copying them to a new Rows.
+                // TODO file some ticket in arrow-rs to make this more efficent?
+                let mut new_group_values = self.row_converter.empty_rows(0, 0);
+                for row in self.group_values.iter().skip(n) {
+                    new_group_values.push(row);
+                }
+                std::mem::swap(&mut new_group_values, &mut self.group_values);
+
+                // rebuild hash table (maybe we should remove the
+                // entries for each group that was emitted rather than
+                // rebuilding the whole thing
+
+                let hashes = self.group_ordering.remove_groups(n);
+                assert_eq!(hashes.len(), self.group_values.num_rows());
+                self.map.clear();
+                for (idx, &hash) in hashes.iter().enumerate() {
+                    self.map.insert(hash, (hash, idx), |(hash, _)| *hash);
+                }
+            }
+        };
+        // account for memory change
+        memory_delta.inc(self.state_size());
+        memory_delta.update(&mut self.reservation)
+    }
+
+    /// return the current size stored by variable state in this structure
+    fn state_size(&self) -> usize {
+        self.group_values.size()
+            + self.scratch_space.size()
+            + self.group_ordering.size()
+            + self.row_converter.size()
     }
 }
 
diff --git a/datafusion/core/src/physical_plan/aggregates/utils.rs b/datafusion/core/src/physical_plan/aggregates/utils.rs
deleted file mode 100644
index a55464edd1..0000000000
--- a/datafusion/core/src/physical_plan/aggregates/utils.rs
+++ /dev/null
@@ -1,150 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! This file contains various utility functions that are common to both
-//! batch and streaming aggregation code.
-
-use crate::physical_plan::aggregates::AccumulatorItem;
-use arrow::compute;
-use arrow::compute::filter;
-use arrow::row::OwnedRow;
-use arrow_array::types::UInt32Type;
-use arrow_array::{Array, ArrayRef, BooleanArray, PrimitiveArray};
-use arrow_schema::{Schema, SchemaRef};
-use datafusion_common::cast::as_boolean_array;
-use datafusion_common::utils::get_arrayref_at_indices;
-use datafusion_common::{DataFusionError, Result, ScalarValue};
-use datafusion_physical_expr::AggregateExpr;
-use datafusion_row::reader::{read_row, RowReader};
-use datafusion_row::MutableRecordBatch;
-use std::sync::Arc;
-
-/// This object encapsulates the state that is built for each output group.
-#[derive(Debug)]
-pub(crate) struct GroupState {
-    /// The actual group by values, stored sequentially
-    pub group_by_values: OwnedRow,
-
-    // Accumulator state, stored sequentially
-    pub aggregation_buffer: Vec<u8>,
-
-    // Accumulator state, one for each aggregate that doesn't support row accumulation
-    pub accumulator_set: Vec<AccumulatorItem>,
-
-    /// Scratch space used to collect indices for input rows in a
-    /// batch that have values to aggregate, reset on each batch.
-    pub indices: Vec<u32>,
-}
-
-#[derive(Debug)]
-/// This object tracks the aggregation phase.
-pub(crate) enum ExecutionState {
-    ReadingInput,
-    ProducingOutput,
-    Done,
-}
-
-pub(crate) fn aggr_state_schema(aggr_expr: &[Arc<dyn AggregateExpr>]) -> SchemaRef {
-    let fields = aggr_expr
-        .iter()
-        .flat_map(|expr| expr.state_fields().unwrap().into_iter())
-        .collect::<Vec<_>>();
-    Arc::new(Schema::new(fields))
-}
-
-pub(crate) fn read_as_batch(rows: &[Vec<u8>], schema: &Schema) -> Vec<ArrayRef> {
-    let mut output = MutableRecordBatch::new(rows.len(), Arc::new(schema.clone()));
-    let mut row = RowReader::new(schema);
-
-    for data in rows {
-        row.point_to(0, data);
-        read_row(&row, &mut output, schema);
-    }
-
-    output.output_as_columns()
-}
-
-pub(crate) fn get_at_indices(
-    input_values: &[Vec<ArrayRef>],
-    batch_indices: &PrimitiveArray<UInt32Type>,
-) -> Result<Vec<Vec<ArrayRef>>> {
-    input_values
-        .iter()
-        .map(|array| get_arrayref_at_indices(array, batch_indices))
-        .collect()
-}
-
-pub(crate) fn get_optional_filters(
-    original_values: &[Option<Arc<dyn Array>>],
-    batch_indices: &PrimitiveArray<UInt32Type>,
-) -> Vec<Option<Arc<dyn Array>>> {
-    original_values
-        .iter()
-        .map(|array| {
-            array.as_ref().map(|array| {
-                compute::take(
-                    array.as_ref(),
-                    batch_indices,
-                    None, // None: no index check
-                )
-                .unwrap()
-            })
-        })
-        .collect()
-}
-
-pub(crate) fn slice_and_maybe_filter(
-    aggr_array: &[ArrayRef],
-    filter_opt: Option<&Arc<dyn Array>>,
-    offsets: &[usize],
-) -> Result<Vec<ArrayRef>> {
-    let (offset, length) = (offsets[0], offsets[1] - offsets[0]);
-    let sliced_arrays: Vec<ArrayRef> = aggr_array
-        .iter()
-        .map(|array| array.slice(offset, length))
-        .collect();
-
-    if let Some(f) = filter_opt {
-        let sliced = f.slice(offset, length);
-        let filter_array = as_boolean_array(&sliced)?;
-
-        sliced_arrays
-            .iter()
-            .map(|array| filter(array, filter_array).map_err(DataFusionError::ArrowError))
-            .collect()
-    } else {
-        Ok(sliced_arrays)
-    }
-}
-
-/// This method is similar to Scalar::try_from_array except for the Null handling.
-/// This method returns [ScalarValue::Null] instead of [ScalarValue::Type(None)].
-pub(crate) fn col_to_scalar(
-    array: &ArrayRef,
-    filter: &Option<&BooleanArray>,
-    row_index: usize,
-) -> Result<ScalarValue> {
-    if array.is_null(row_index) {
-        return Ok(ScalarValue::Null);
-    }
-    if let Some(filter) = filter {
-        if !filter.value(row_index) {
-            return Ok(ScalarValue::Null);
-        }
-    }
-    ScalarValue::try_from_array(array, row_index)
-}
diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
index 74dd9ee1d1..e27c133412 100644
--- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
@@ -108,9 +108,6 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>, group_by_columns: Vec<&str
         .collect::<Vec<_>>();
     let group_by = PhysicalGroupBy::new_single(expr);
 
-    println!("aggregate_expr: {aggregate_expr:?}");
-    println!("group_by: {group_by:?}");
-
     let aggregate_exec_running = Arc::new(
         AggregateExec::try_new(
             AggregateMode::Partial,
@@ -170,6 +167,8 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>, group_by_columns: Vec<&str
             (i, usual_line),
             (i, running_line),
             "Inconsistent result\n\n\
+             Aggregate_expr: {aggregate_expr:?}\n\
+             group_by: {group_by:?}\n\
              Left Plan:\n{}\n\
              Right Plan:\n{}\n\
              schema:\n{schema}\n\
diff --git a/datafusion/execution/src/memory_pool/mod.rs b/datafusion/execution/src/memory_pool/mod.rs
index d002cda8d8..fe077524a4 100644
--- a/datafusion/execution/src/memory_pool/mod.rs
+++ b/datafusion/execution/src/memory_pool/mod.rs
@@ -18,7 +18,7 @@
 //! Manages all available memory during query execution
 
 use datafusion_common::Result;
-use std::sync::Arc;
+use std::{cmp::Ordering, sync::Arc};
 
 mod pool;
 pub mod proxy;
@@ -157,7 +157,6 @@ impl MemoryReservation {
 
     /// Sets the size of this reservation to `capacity`
     pub fn resize(&mut self, capacity: usize) {
-        use std::cmp::Ordering;
         match capacity.cmp(&self.size) {
             Ordering::Greater => self.grow(capacity - self.size),
             Ordering::Less => self.shrink(self.size - capacity),
@@ -167,7 +166,6 @@ impl MemoryReservation {
 
     /// Try to set the size of this reservation to `capacity`
     pub fn try_resize(&mut self, capacity: usize) -> Result<()> {
-        use std::cmp::Ordering;
         match capacity.cmp(&self.size) {
             Ordering::Greater => self.try_grow(capacity - self.size)?,
             Ordering::Less => self.shrink(self.size - capacity),
@@ -221,6 +219,49 @@ pub fn human_readable_size(size: usize) -> String {
     format!("{value:.1} {unit}")
 }
 
+/// Tracks the change in memory to avoid overflow. Typically, this
+/// is isued like the following
+///
+/// 1. Call `delta.dec(sized_thing.size())`
+///
+/// 2. potentially change size of `sized_thing`
+///
+/// 3. Call `delta.inc(size_thing.size())`
+#[derive(Debug, Default)]
+pub struct MemoryDelta {
+    decrease: usize,
+    increase: usize,
+}
+
+impl MemoryDelta {
+    pub fn new() -> Self {
+        Default::default()
+    }
+
+    /// record size being 'decremented'. This is used for to record the
+    /// initial size of some allocation prior to hange
+    pub fn dec(&mut self, sz: usize) {
+        self.decrease += sz;
+    }
+
+    /// record size being 'incremented'. This is used for to record
+    /// the final size of some object.
+    pub fn inc(&mut self, sz: usize) {
+        self.increase += sz;
+    }
+
+    /// Adjusts the reservation with the delta used / freed
+    pub fn update(self, reservation: &mut MemoryReservation) -> Result<()> {
+        let Self { decrease, increase } = self;
+        match increase.cmp(&decrease) {
+            Ordering::Less => reservation.shrink(decrease - increase),
+            Ordering::Equal => {}
+            Ordering::Greater => reservation.try_grow(increase - decrease)?,
+        };
+        Ok(())
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs
index e95e9fcf87..6ad0f94e03 100644
--- a/datafusion/physical-expr/src/aggregate/average.rs
+++ b/datafusion/physical-expr/src/aggregate/average.rs
@@ -48,6 +48,7 @@ use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::Accumulator;
 use datafusion_row::accessor::RowAccessor;
 
+use super::groups_accumulator::EmitTo;
 use super::utils::{adjust_output_array, Decimal128Averager};
 
 /// AVG aggregate expression
@@ -560,10 +561,10 @@ where
         Ok(())
     }
 
-    fn evaluate(&mut self) -> Result<ArrayRef> {
-        let counts = std::mem::take(&mut self.counts);
-        let sums = std::mem::take(&mut self.sums);
-        let nulls = self.null_state.build();
+    fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
+        let counts = emit_to.take_needed(&mut self.counts);
+        let sums = emit_to.take_needed(&mut self.sums);
+        let nulls = self.null_state.build(emit_to);
 
         assert_eq!(nulls.len(), sums.len());
         assert_eq!(counts.len(), sums.len());
@@ -598,12 +599,14 @@ where
     }
 
     // return arrays for sums and counts
-    fn state(&mut self) -> Result<Vec<ArrayRef>> {
-        let nulls = Some(self.null_state.build());
-        let counts = std::mem::take(&mut self.counts);
+    fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
+        let nulls = self.null_state.build(emit_to);
+        let nulls = Some(nulls);
+
+        let counts = emit_to.take_needed(&mut self.counts);
         let counts = UInt64Array::new(counts.into(), nulls.clone()); // zero copy
 
-        let sums = std::mem::take(&mut self.sums);
+        let sums = emit_to.take_needed(&mut self.sums);
         let sums = PrimitiveArray::<T>::new(sums.into(), nulls); // zero copy
         let sums = adjust_output_array(&self.sum_data_type, Arc::new(sums))?;
 
diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs
index e0b9ffd81a..60e15a673a 100644
--- a/datafusion/physical-expr/src/aggregate/count.rs
+++ b/datafusion/physical-expr/src/aggregate/count.rs
@@ -41,6 +41,7 @@ use datafusion_row::accessor::RowAccessor;
 use crate::expressions::format_state_name;
 
 use super::groups_accumulator::accumulate::accumulate_indices;
+use super::groups_accumulator::EmitTo;
 
 /// COUNT aggregate expression
 /// Returns the amount of non-null values of the given expression.
@@ -171,8 +172,8 @@ impl GroupsAccumulator for CountGroupsAccumulator {
         Ok(())
     }
 
-    fn evaluate(&mut self) -> Result<ArrayRef> {
-        let counts = std::mem::take(&mut self.counts);
+    fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
+        let counts = emit_to.take_needed(&mut self.counts);
 
         // Count is always non null (null inputs just don't contribute to the overall values)
         let nulls = None;
@@ -182,8 +183,8 @@ impl GroupsAccumulator for CountGroupsAccumulator {
     }
 
     // return arrays for counts
-    fn state(&mut self) -> Result<Vec<ArrayRef>> {
-        let counts = std::mem::take(&mut self.counts);
+    fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
+        let counts = emit_to.take_needed(&mut self.counts);
         let counts: PrimitiveArray<Int64Type> = Int64Array::from(counts); // zero copy, no nulls
         Ok(vec![Arc::new(counts) as ArrayRef])
     }
diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs
index bcc9d30bed..596265a737 100644
--- a/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs
+++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs
@@ -21,7 +21,9 @@
 
 use arrow::datatypes::ArrowPrimitiveType;
 use arrow_array::{Array, BooleanArray, PrimitiveArray};
-use arrow_buffer::{BooleanBufferBuilder, NullBuffer};
+use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, NullBuffer};
+
+use crate::EmitTo;
 
 /// Track the accumulator null state per row: if any values for that
 /// group were null and if any values have been seen at all for that group.
@@ -317,13 +319,30 @@ impl NullState {
         }
     }
 
-    /// Creates the final [`NullBuffer`] representing which
-    /// group_indices should have null values (because they never saw
-    /// any values)
+    /// Creates the a [`NullBuffer`] representing which group_indices
+    /// should have null values (because they never saw any values)
+    /// for the `emit_to` rows.
     ///
-    /// resets the internal state to empty
-    pub fn build(&mut self) -> NullBuffer {
-        NullBuffer::new(self.seen_values.finish())
+    /// resets the internal state appropriately
+    pub fn build(&mut self, emit_to: EmitTo) -> NullBuffer {
+        let nulls: BooleanBuffer = self.seen_values.finish();
+
+        let nulls = match emit_to {
+            EmitTo::All => nulls,
+            EmitTo::First(n) => {
+                // split off the first N values in seen_values
+                //
+                // TODO make this more efficient rather than two
+                // copies and bitwise manipulation
+                let first_n_null: BooleanBuffer = nulls.iter().take(n).collect();
+                // reset the existing seen buffer
+                for seen in nulls.iter().skip(n) {
+                    self.seen_values.append(seen);
+                }
+                first_n_null
+            }
+        };
+        NullBuffer::new(nulls)
     }
 }
 
@@ -689,7 +708,7 @@ mod test {
             // Validate the final buffer (one value per group)
             let expected_null_buffer = mock.expected_null_buffer(total_num_groups);
 
-            let null_buffer = null_state.build();
+            let null_buffer = null_state.build(EmitTo::All);
 
             assert_eq!(null_buffer, expected_null_buffer);
         }
@@ -806,7 +825,7 @@ mod test {
             // Validate the final buffer (one value per group)
             let expected_null_buffer = mock.expected_null_buffer(total_num_groups);
 
-            let null_buffer = null_state.build();
+            let null_buffer = null_state.build(EmitTo::All);
 
             assert_eq!(null_buffer, expected_null_buffer);
         }
diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs
index 7b4c61fe7d..dcc8c37e74 100644
--- a/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs
+++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs
@@ -17,7 +17,7 @@
 
 //! Adapter that makes [`GroupsAccumulator`] out of [`Accumulator`]
 
-use super::GroupsAccumulator;
+use super::{EmitTo, GroupsAccumulator};
 use arrow::{
     array::{AsArray, UInt32Builder},
     compute,
@@ -44,8 +44,10 @@ pub struct GroupsAccumulatorAdapter {
 
     /// Current memory usage, in bytes.
     ///
-    /// Note this is incrementally updated to avoid size() being a
-    /// bottleneck, which we saw in earlier implementations.
+    /// Note this is incrementally updated with deltas to avoid the
+    /// call to size() being a bottleneck. We saw size() being a
+    /// bottleneck in earlier implementations when there were many
+    /// distinct groups.
     allocation_bytes: usize,
 }
 
@@ -71,7 +73,7 @@ impl AccumulatorState {
     fn size(&self) -> usize {
         self.accumulator.size()
             + std::mem::size_of_val(self)
-            + std::mem::size_of::<u32>() * self.indices.capacity()
+            + self.indices.allocated_size()
     }
 }
 
@@ -82,40 +84,29 @@ impl GroupsAccumulatorAdapter {
     where
         F: Fn() -> Result<Box<dyn Accumulator>> + Send + 'static,
     {
-        let mut new_self = Self {
+        Self {
             factory: Box::new(factory),
             states: vec![],
             allocation_bytes: 0,
-        };
-        new_self.reset_allocation();
-        new_self
-    }
-
-    // Reset the allocation bytes to empty state
-    fn reset_allocation(&mut self) {
-        assert!(self.states.is_empty());
-        self.allocation_bytes = std::mem::size_of::<GroupsAccumulatorAdapter>();
+        }
     }
 
     /// Ensure that self.accumulators has total_num_groups
     fn make_accumulators_if_needed(&mut self, total_num_groups: usize) -> Result<()> {
         // can't shrink
         assert!(total_num_groups >= self.states.len());
-        let vec_size_pre =
-            std::mem::size_of::<AccumulatorState>() * self.states.capacity();
+        let vec_size_pre = self.states.allocated_size();
 
         // instantiate new accumulators
         let new_accumulators = total_num_groups - self.states.len();
         for _ in 0..new_accumulators {
             let accumulator = (self.factory)()?;
             let state = AccumulatorState::new(accumulator);
-            self.allocation_bytes += state.size();
+            self.add_allocation(state.size());
             self.states.push(state);
         }
 
-        self.allocation_bytes +=
-            std::mem::size_of::<AccumulatorState>() * self.states.capacity();
-        self.allocation_bytes -= vec_size_pre;
+        self.adjust_allocation(vec_size_pre, self.states.allocated_size());
         Ok(())
     }
 
@@ -204,9 +195,11 @@ impl GroupsAccumulatorAdapter {
         // RecordBatch(es)
         let iter = groups_with_rows.iter().zip(offsets.windows(2));
 
+        let mut sizes_pre = 0;
+        let mut sizes_post = 0;
         for (&group_idx, offsets) in iter {
             let state = &mut self.states[group_idx];
-            let size_pre = state.size();
+            sizes_pre += state.size();
 
             let values_to_accumulate =
                 slice_and_maybe_filter(&values, opt_filter.as_ref(), offsets)?;
@@ -215,12 +208,40 @@ impl GroupsAccumulatorAdapter {
             // clear out the state so they are empty for next
             // iteration
             state.indices.clear();
-
-            self.allocation_bytes += state.size();
-            self.allocation_bytes -= size_pre;
+            sizes_post += state.size();
         }
+
+        self.adjust_allocation(sizes_pre, sizes_post);
         Ok(())
     }
+
+    /// Increment the allocation by `n`
+    ///
+    /// See [`Self::allocation_bytes`] for rationale.
+    fn add_allocation(&mut self, size: usize) {
+        self.allocation_bytes += size;
+    }
+
+    /// Decrease the allocation by `n`
+    ///
+    /// See [`Self::allocation_bytes`] for rationale.
+    fn free_allocation(&mut self, size: usize) {
+        // use saturating sub to avoid errors if the accumulators
+        // report erronious sizes
+        self.allocation_bytes = self.allocation_bytes.saturating_sub(size)
+    }
+
+    /// Adjusts the allocation for something that started with
+    /// start_size and now has new_size avoiding overflow
+    ///
+    /// See [`Self::allocation_bytes`] for rationale.
+    fn adjust_allocation(&mut self, old_size: usize, new_size: usize) {
+        if new_size > old_size {
+            self.add_allocation(new_size - old_size)
+        } else {
+            self.free_allocation(old_size - new_size)
+        }
+    }
 }
 
 impl GroupsAccumulator for GroupsAccumulatorAdapter {
@@ -243,27 +264,36 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter {
         Ok(())
     }
 
-    fn evaluate(&mut self) -> Result<ArrayRef> {
-        let states = std::mem::take(&mut self.states);
+    fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
+        let vec_size_pre = self.states.allocated_size();
+
+        let states = emit_to.take_needed(&mut self.states);
 
         let results: Vec<ScalarValue> = states
             .into_iter()
-            .map(|state| state.accumulator.evaluate())
+            .map(|state| {
+                self.free_allocation(state.size());
+                state.accumulator.evaluate()
+            })
             .collect::<Result<_>>()?;
 
         let result = ScalarValue::iter_to_array(results);
-        self.reset_allocation();
+
+        self.adjust_allocation(vec_size_pre, self.states.allocated_size());
+
         result
     }
 
-    fn state(&mut self) -> Result<Vec<ArrayRef>> {
-        let states = std::mem::take(&mut self.states);
+    fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
+        let vec_size_pre = self.states.allocated_size();
+        let states = emit_to.take_needed(&mut self.states);
 
         // each accumulator produces a potential vector of values
         // which we need to form into columns
         let mut results: Vec<Vec<ScalarValue>> = vec![];
 
         for state in states {
+            self.free_allocation(state.size());
             let accumulator_state = state.accumulator.state()?;
             results.resize_with(accumulator_state.len(), Vec::new);
             for (idx, state_val) in accumulator_state.into_iter().enumerate() {
@@ -284,8 +314,8 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter {
                 assert_eq!(arr.len(), first_col.len())
             }
         }
+        self.adjust_allocation(vec_size_pre, self.states.allocated_size());
 
-        self.reset_allocation();
         Ok(arrays)
     }
 
@@ -302,7 +332,8 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter {
             opt_filter,
             total_num_groups,
             |accumulator, values_to_accumulate| {
-                accumulator.merge_batch(values_to_accumulate)
+                accumulator.merge_batch(values_to_accumulate)?;
+                Ok(())
             },
         )?;
         Ok(())
@@ -313,6 +344,23 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter {
     }
 }
 
+/// Extension trait for [`Vec`] to account for allocations.
+pub trait VecAllocExt {
+    /// Item type.
+    type T;
+    /// Return the amount of memory allocated by this Vec (not
+    /// recursively counting any heap allocations contained within the
+    /// structure). Does not include the size of `self`
+    fn allocated_size(&self) -> usize;
+}
+
+impl<T> VecAllocExt for Vec<T> {
+    type T = T;
+    fn allocated_size(&self) -> usize {
+        std::mem::size_of::<T>() * self.capacity()
+    }
+}
+
 fn get_filter_at_indices(
     opt_filter: Option<&BooleanArray>,
     indices: &PrimitiveArray<UInt32Type>,
diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs
index 83ffc3717b..21b6cc29e8 100644
--- a/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs
+++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs
@@ -19,12 +19,12 @@ use std::sync::Arc;
 
 use arrow::array::AsArray;
 use arrow_array::{ArrayRef, BooleanArray};
-use arrow_buffer::BooleanBufferBuilder;
+use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder};
 use datafusion_common::Result;
 
 use crate::GroupsAccumulator;
 
-use super::accumulate::NullState;
+use super::{accumulate::NullState, EmitTo};
 
 /// An accumulator that implements a single operation over a
 /// [`BooleanArray`] where the accumulated state is also boolean (such
@@ -98,15 +98,28 @@ where
         Ok(())
     }
 
-    fn evaluate(&mut self) -> Result<ArrayRef> {
+    fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
         let values = self.values.finish();
-        let nulls = self.null_state.build();
+
+        let values = match emit_to {
+            EmitTo::All => values,
+            EmitTo::First(n) => {
+                let first_n: BooleanBuffer = values.iter().take(n).collect();
+                // put n+1 back into self.values
+                for v in values.iter().skip(n) {
+                    self.values.append(v);
+                }
+                first_n
+            }
+        };
+
+        let nulls = self.null_state.build(emit_to);
         let values = BooleanArray::new(values, Some(nulls));
         Ok(Arc::new(values))
     }
 
-    fn state(&mut self) -> Result<Vec<ArrayRef>> {
-        self.evaluate().map(|arr| vec![arr])
+    fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
+        self.evaluate(emit_to).map(|arr| vec![arr])
     }
 
     fn merge_batch(
diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs
index 49d62e7a93..d2e64d373b 100644
--- a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs
+++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs
@@ -27,6 +27,42 @@ pub(crate) mod prim_op;
 use arrow_array::{ArrayRef, BooleanArray};
 use datafusion_common::Result;
 
+/// Describes how many rows should be emitted during grouping.
+#[derive(Debug, Clone, Copy)]
+pub enum EmitTo {
+    /// Emit all groups
+    All,
+    /// Emit only the first `n` groups and shift all existing group
+    /// indexes down by `n`.
+    ///
+    /// For example, if `n=10`, group_index `0, 1, ... 9` are emitted
+    /// and group indexes '`10, 11, 12, ...` become `0, 1, 2, ...`.
+    First(usize),
+}
+
+impl EmitTo {
+    /// Removes the number of rows from `v` required to emit the right
+    /// number of rows, returning a `Vec` with elements taken, and the
+    /// remaining values in `v`.
+    ///
+    /// This avoids copying if Self::All
+    pub fn take_needed<T>(&self, v: &mut Vec<T>) -> Vec<T> {
+        match self {
+            Self::All => {
+                // Take the entire vector, leave new (empty) vector
+                std::mem::take(v)
+            }
+            Self::First(n) => {
+                // get end n+1,.. values into t
+                let mut t = v.split_off(*n);
+                // leave n+1,.. in v
+                std::mem::swap(v, &mut t);
+                t
+            }
+        }
+    }
+}
+
 /// `GroupAccumulator` implements a single aggregate (e.g. AVG) and
 /// stores the state for *all* groups internally.
 ///
@@ -72,28 +108,30 @@ pub trait GroupsAccumulator: Send {
     /// each group, and `evaluate` will produce that running sum as
     /// its output for all groups, in group_index order
     ///
-    /// The accumulator should free to release / reset it is internal
-    /// state after this call to the same as it was after being
-    /// initially created.
-    fn evaluate(&mut self) -> Result<ArrayRef>;
+    /// If `emit_to`` is [`EmitTo::All`], the accumulator should
+    /// return all groups and release / reset its internal state
+    /// equivalent to when it was first created.
+    ///
+    /// If `emit_to` is [`EmitTo::First`], only the first `n` groups
+    /// should be emitted and the state for those first groups
+    /// removed. State for the remaining groups must be retained for
+    /// future use. The group_indices on subsequent calls to
+    /// `update_batch` or `merge_batch` will be shifted down by
+    /// `n`. See [`EmitTo::First`] for more details.
+    fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef>;
 
     /// Returns the intermediate aggregate state for this accumulator,
     /// used for multi-phase grouping, resetting its internal state.
     ///
-    /// The rows returned *must* be in group_index order: The value
-    /// for group_index 0, followed by 1, etc.  Any group_index that
-    /// did not have values, should be null.
-    ///
     /// For example, `AVG` might return two arrays: `SUM` and `COUNT`
     /// but the `MIN` aggregate would just return a single array.
     ///
     /// Note more sophisticated internal state can be passed as
     /// single `StructArray` rather than multiple arrays.
     ///
-    /// The accumulator should free to release / reset its internal
-    /// state after this call to the same as it was after being
-    /// initially created.
-    fn state(&mut self) -> Result<Vec<ArrayRef>>;
+    /// See [`Self::evaluate`] for details on the required output
+    /// order and  `emit_to`.
+    fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;
 
     /// Merges intermediate state (the output from [`Self::state`])
     /// into this accumulator's values.
diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs
index a49651a5e3..adeaea712c 100644
--- a/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs
+++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs
@@ -24,7 +24,7 @@ use datafusion_common::Result;
 
 use crate::{aggregate::utils::adjust_output_array, GroupsAccumulator};
 
-use super::accumulate::NullState;
+use super::{accumulate::NullState, EmitTo};
 
 /// An accumulator that implements a single operation over
 /// [`ArrowPrimitiveType`] where the accumulated state is the same as
@@ -112,16 +112,16 @@ where
         Ok(())
     }
 
-    fn evaluate(&mut self) -> Result<ArrayRef> {
-        let values = std::mem::take(&mut self.values);
-        let nulls = self.null_state.build();
+    fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
+        let values = emit_to.take_needed(&mut self.values);
+        let nulls = self.null_state.build(emit_to);
         let values = PrimitiveArray::<T>::new(values.into(), Some(nulls)); // no copy
 
         adjust_output_array(&self.data_type, Arc::new(values))
     }
 
-    fn state(&mut self) -> Result<Vec<ArrayRef>> {
-        self.evaluate().map(|arr| vec![arr])
+    fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
+        self.evaluate(emit_to).map(|arr| vec![arr])
     }
 
     fn merge_batch(
diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs
index 165faba442..faa805dc92 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -46,8 +46,9 @@ pub mod utils;
 pub mod var_provider;
 pub mod window;
 
-// reexport this to maintain compatibility with anything that used from_slice previously
-pub use aggregate::groups_accumulator::{GroupsAccumulator, GroupsAccumulatorAdapter};
+pub use aggregate::groups_accumulator::{
+    EmitTo, GroupsAccumulator, GroupsAccumulatorAdapter,
+};
 pub use aggregate::AggregateExpr;
 
 pub use equivalence::{