You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/12/19 14:35:03 UTC

[arrow-datafusion] branch master updated: Simplify MemoryManager (#4522)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new dba34fcf5 Simplify MemoryManager (#4522)
dba34fcf5 is described below

commit dba34fcf5ff7ed89f9f1ac774a90f149c2bf8b74
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Mon Dec 19 14:34:56 2022 +0000

    Simplify MemoryManager (#4522)
    
    * Simplify MemoryManager
    
    * Fix tests
    
    * Add MemoryPool abstraction
    
    * Misc fixes
    
    * Remove MemoryManager
    
    * Tweak doc
    
    * Rename module
    
    * Format
    
    * Review feedback
    
    * Further tweaks
    
    * Fix Drop
---
 datafusion/core/src/execution/context.rs           |  30 +-
 .../core/src/execution/memory_manager/mod.rs       | 664 ---------------------
 datafusion/core/src/execution/memory_pool/mod.rs   | 222 +++++++
 datafusion/core/src/execution/memory_pool/pool.rs  | 285 +++++++++
 .../{memory_manager => memory_pool}/proxy.rs       |  87 ---
 datafusion/core/src/execution/mod.rs               |   5 +-
 datafusion/core/src/execution/runtime_env.rs       |  54 +-
 datafusion/core/src/lib.rs                         |   1 +
 .../core/src/physical_plan/aggregates/hash.rs      |  31 +-
 .../src/physical_plan/aggregates/no_grouping.rs    |  26 +-
 .../core/src/physical_plan/aggregates/row_hash.rs  |  30 +-
 datafusion/core/src/physical_plan/common.rs        |   2 +-
 datafusion/core/src/physical_plan/explain.rs       |   3 +-
 .../core/src/physical_plan/metrics/composite.rs    |  12 +-
 .../core/src/physical_plan/metrics/tracker.rs      |  53 +-
 datafusion/core/src/physical_plan/sorts/sort.rs    | 177 ++----
 .../physical_plan/sorts/sort_preserving_merge.rs   |   8 +-
 datafusion/core/tests/memory_limit.rs              |  26 +-
 datafusion/core/tests/order_spill_fuzz.rs          |  47 +-
 datafusion/core/tests/parquet/filter_pushdown.rs   |   4 +-
 datafusion/core/tests/provider_filter_pushdown.rs  |   5 +-
 test-utils/src/data_gen.rs                         |  55 +-
 22 files changed, 747 insertions(+), 1080 deletions(-)

diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index ce85f1821..d6eb5240a 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -99,6 +99,7 @@ use url::Url;
 
 use crate::catalog::listing_schema::ListingSchemaProvider;
 use crate::datasource::object_store::ObjectStoreUrl;
+use crate::execution::memory_pool::MemoryPool;
 use uuid::Uuid;
 
 use super::options::{
@@ -1960,6 +1961,11 @@ impl TaskContext {
         self.task_id.clone()
     }
 
+    /// Return the [`MemoryPool`] associated with this [TaskContext]
+    pub fn memory_pool(&self) -> &Arc<dyn MemoryPool> {
+        &self.runtime.memory_pool
+    }
+
     /// Return the [RuntimeEnv] associated with this [TaskContext]
     pub fn runtime_env(&self) -> Arc<RuntimeEnv> {
         self.runtime.clone()
@@ -2025,6 +2031,7 @@ mod tests {
     use super::*;
     use crate::assert_batches_eq;
     use crate::execution::context::QueryPlanner;
+    use crate::execution::memory_pool::MemoryConsumer;
     use crate::execution::runtime_env::RuntimeConfig;
     use crate::physical_plan::expressions::AvgAccumulator;
     use crate::test;
@@ -2046,24 +2053,27 @@ mod tests {
     #[tokio::test]
     async fn shared_memory_and_disk_manager() {
         // Demonstrate the ability to share DiskManager and
-        // MemoryManager between two different executions.
+        // MemoryPool between two different executions.
         let ctx1 = SessionContext::new();
 
         // configure with same memory / disk manager
-        let memory_manager = ctx1.runtime_env().memory_manager.clone();
+        let memory_pool = ctx1.runtime_env().memory_pool.clone();
+
+        let mut reservation = MemoryConsumer::new("test").register(&memory_pool);
+        reservation.grow(100);
+
         let disk_manager = ctx1.runtime_env().disk_manager.clone();
 
         let ctx2 =
             SessionContext::with_config_rt(SessionConfig::new(), ctx1.runtime_env());
 
-        assert!(std::ptr::eq(
-            Arc::as_ptr(&memory_manager),
-            Arc::as_ptr(&ctx1.runtime_env().memory_manager)
-        ));
-        assert!(std::ptr::eq(
-            Arc::as_ptr(&memory_manager),
-            Arc::as_ptr(&ctx2.runtime_env().memory_manager)
-        ));
+        assert_eq!(ctx1.runtime_env().memory_pool.reserved(), 100);
+        assert_eq!(ctx2.runtime_env().memory_pool.reserved(), 100);
+
+        drop(reservation);
+
+        assert_eq!(ctx1.runtime_env().memory_pool.reserved(), 0);
+        assert_eq!(ctx2.runtime_env().memory_pool.reserved(), 0);
 
         assert!(std::ptr::eq(
             Arc::as_ptr(&disk_manager),
diff --git a/datafusion/core/src/execution/memory_manager/mod.rs b/datafusion/core/src/execution/memory_manager/mod.rs
deleted file mode 100644
index c3ff444eb..000000000
--- a/datafusion/core/src/execution/memory_manager/mod.rs
+++ /dev/null
@@ -1,664 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Manages all available memory during query execution
-
-use crate::error::{DataFusionError, Result};
-use async_trait::async_trait;
-use hashbrown::HashSet;
-use log::{debug, warn};
-use parking_lot::{Condvar, Mutex};
-use std::fmt;
-use std::fmt::{Debug, Display, Formatter};
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::Arc;
-use std::time::{Duration, Instant};
-
-pub mod proxy;
-
-static CONSUMER_ID: AtomicUsize = AtomicUsize::new(0);
-
-#[derive(Debug, Clone)]
-/// Configuration information for memory management
-pub enum MemoryManagerConfig {
-    /// Use the existing [MemoryManager]
-    Existing(Arc<MemoryManager>),
-
-    /// Create a new [MemoryManager] that will use up to some
-    /// fraction of total system memory.
-    New {
-        /// Max execution memory allowed for DataFusion.  Defaults to
-        /// `usize::MAX`, which will not attempt to limit the memory
-        /// used during plan execution.
-        max_memory: usize,
-
-        /// The fraction of `max_memory` that the memory manager will
-        /// use for execution.
-        ///
-        /// The purpose of this config is to set aside memory for
-        /// untracked data structures, and imprecise size estimation
-        /// during memory acquisition.  Defaults to 0.7
-        memory_fraction: f64,
-    },
-}
-
-impl Default for MemoryManagerConfig {
-    fn default() -> Self {
-        Self::New {
-            max_memory: usize::MAX,
-            memory_fraction: 0.7,
-        }
-    }
-}
-
-impl MemoryManagerConfig {
-    /// Create a new memory [MemoryManager] with no limit on the
-    /// memory used
-    pub fn new() -> Self {
-        Default::default()
-    }
-
-    /// Create a configuration based on an existing [MemoryManager]
-    pub fn new_existing(existing: Arc<MemoryManager>) -> Self {
-        Self::Existing(existing)
-    }
-
-    /// Create a new [MemoryManager] with a `max_memory` and `fraction`
-    pub fn try_new_limit(max_memory: usize, memory_fraction: f64) -> Result<Self> {
-        if max_memory == 0 {
-            return Err(DataFusionError::Plan(format!(
-                "invalid max_memory. Expected greater than 0, got {}",
-                max_memory
-            )));
-        }
-        if !(memory_fraction > 0f64 && memory_fraction <= 1f64) {
-            return Err(DataFusionError::Plan(format!(
-                "invalid fraction. Expected greater than 0 and less than 1.0, got {}",
-                memory_fraction
-            )));
-        }
-
-        Ok(Self::New {
-            max_memory,
-            memory_fraction,
-        })
-    }
-
-    /// return the maximum size of the memory, in bytes, this config will allow
-    fn pool_size(&self) -> usize {
-        match self {
-            MemoryManagerConfig::Existing(existing) => existing.pool_size,
-            MemoryManagerConfig::New {
-                max_memory,
-                memory_fraction,
-            } => (*max_memory as f64 * *memory_fraction) as usize,
-        }
-    }
-}
-
-fn next_id() -> usize {
-    CONSUMER_ID.fetch_add(1, Ordering::SeqCst)
-}
-
-/// Type of the memory consumer
-pub enum ConsumerType {
-    /// consumers that can grow its memory usage by requesting more from the memory manager or
-    /// shrinks its memory usage when we can no more assign available memory to it.
-    /// Examples are spillable sorter, spillable hashmap, etc.
-    Requesting,
-    /// consumers that are not spillable, counting in for only tracking purpose.
-    Tracking,
-}
-
-#[derive(Clone, Debug, Hash, Eq, PartialEq)]
-/// Id that uniquely identifies a Memory Consumer
-pub struct MemoryConsumerId {
-    /// partition the consumer belongs to
-    pub partition_id: usize,
-    /// unique id
-    pub id: usize,
-}
-
-impl MemoryConsumerId {
-    /// Auto incremented new Id
-    pub fn new(partition_id: usize) -> Self {
-        let id = next_id();
-        Self { partition_id, id }
-    }
-}
-
-impl Display for MemoryConsumerId {
-    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
-        write!(f, "{}:{}", self.partition_id, self.id)
-    }
-}
-
-#[async_trait]
-/// A memory consumer that either takes up memory (of type `ConsumerType::Tracking`)
-/// or grows/shrinks memory usage based on available memory (of type `ConsumerType::Requesting`).
-pub trait MemoryConsumer: Send + Sync {
-    /// Display name of the consumer
-    fn name(&self) -> String;
-
-    /// Unique id of the consumer
-    fn id(&self) -> &MemoryConsumerId;
-
-    /// Ptr to MemoryManager
-    fn memory_manager(&self) -> Arc<MemoryManager>;
-
-    /// Partition that the consumer belongs to
-    fn partition_id(&self) -> usize {
-        self.id().partition_id
-    }
-
-    /// Type of the consumer
-    fn type_(&self) -> &ConsumerType;
-
-    /// Grow memory by `required` to buffer more data in memory,
-    /// this may trigger spill before grow when the memory threshold is
-    /// reached for this consumer.
-    async fn try_grow(&self, required: usize) -> Result<()> {
-        let current = self.mem_used();
-        debug!(
-            "trying to acquire {} whiling holding {} from consumer {}",
-            human_readable_size(required),
-            human_readable_size(current),
-            self.id(),
-        );
-
-        let can_grow_directly =
-            self.memory_manager().can_grow_directly(required, current);
-        if !can_grow_directly {
-            debug!(
-                "Failed to grow memory of {} directly from consumer {}, spilling first ...",
-                human_readable_size(required),
-                self.id()
-            );
-            let freed = self.spill().await?;
-            self.memory_manager()
-                .record_free_then_acquire(freed, required);
-        }
-        Ok(())
-    }
-
-    /// Grow without spilling to the disk. It grows the memory directly
-    /// so it should be only used when the consumer already allocated the
-    /// memory and it is safe to grow without spilling.
-    fn grow(&self, required: usize) {
-        self.memory_manager().record_free_then_acquire(0, required);
-    }
-
-    /// Return `freed` memory to the memory manager,
-    /// may wake up other requesters waiting for their minimum memory quota.
-    fn shrink(&self, freed: usize) {
-        self.memory_manager().record_free(freed);
-    }
-
-    /// Spill in-memory buffers to disk, free memory, return the previous used
-    async fn spill(&self) -> Result<usize>;
-
-    /// Current memory used by this consumer
-    fn mem_used(&self) -> usize;
-}
-
-impl Debug for dyn MemoryConsumer {
-    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
-        write!(
-            f,
-            "{}[{}]: {}",
-            self.name(),
-            self.id(),
-            human_readable_size(self.mem_used())
-        )
-    }
-}
-
-impl Display for dyn MemoryConsumer {
-    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
-        write!(f, "{}[{}]", self.name(), self.id(),)
-    }
-}
-
-/*
-The memory management architecture is the following:
-
-1. User designates max execution memory by setting RuntimeConfig.max_memory and RuntimeConfig.memory_fraction (float64 between 0..1).
-   The actual max memory DataFusion could use `pool_size =  max_memory * memory_fraction`.
-2. The entities that take up memory during its execution are called 'Memory Consumers'. Operators or others are encouraged to
-   register themselves to the memory manager and report its usage through `mem_used()`.
-3. There are two kinds of consumers:
-   - 'Requesting' consumers that would acquire memory during its execution and release memory through `spill` if no more memory is available.
-   - 'Tracking' consumers that exist for reporting purposes to provide a more accurate memory usage estimation for memory consumers.
-4. Requesting and tracking consumers share the pool. Each controlling consumer could acquire a maximum of
-   (pool_size - all_tracking_used) / active_num_controlling_consumers.
-
-            Memory Space for the DataFusion Lib / Process of `pool_size`
-   ┌──────────────────────────────────────────────z─────────────────────────────┐
-   │                                              z                             │
-   │                                              z                             │
-   │               Requesting                     z          Tracking           │
-   │            Memory Consumers                  z       Memory Consumers      │
-   │                                              z                             │
-   │                                              z                             │
-   └──────────────────────────────────────────────z─────────────────────────────┘
-*/
-
-/// Manage memory usage during physical plan execution
-#[derive(Debug)]
-pub struct MemoryManager {
-    requesters: Arc<Mutex<HashSet<MemoryConsumerId>>>,
-    pool_size: usize,
-    requesters_total: Arc<Mutex<usize>>,
-    trackers_total: AtomicUsize,
-    cv: Condvar,
-}
-
-impl MemoryManager {
-    /// Create new memory manager based on the configuration
-    #[allow(clippy::mutex_atomic)]
-    pub fn new(config: MemoryManagerConfig) -> Arc<Self> {
-        let pool_size = config.pool_size();
-
-        match config {
-            MemoryManagerConfig::Existing(manager) => manager,
-            MemoryManagerConfig::New { .. } => {
-                debug!(
-                    "Creating memory manager with initial size {}",
-                    human_readable_size(pool_size)
-                );
-
-                Arc::new(Self {
-                    requesters: Arc::new(Mutex::new(HashSet::new())),
-                    pool_size,
-                    requesters_total: Arc::new(Mutex::new(0)),
-                    trackers_total: AtomicUsize::new(0),
-                    cv: Condvar::new(),
-                })
-            }
-        }
-    }
-
-    fn get_tracker_total(&self) -> usize {
-        self.trackers_total.load(Ordering::SeqCst)
-    }
-
-    pub(crate) fn grow_tracker_usage(&self, delta: usize) {
-        self.trackers_total.fetch_add(delta, Ordering::SeqCst);
-    }
-
-    pub(crate) fn shrink_tracker_usage(&self, delta: usize) {
-        let update =
-            self.trackers_total
-                .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
-                    if x >= delta {
-                        Some(x - delta)
-                    } else {
-                        None
-                    }
-                });
-        update.unwrap_or_else(|_| {
-            panic!(
-                "Tracker total memory shrink by {} underflow, current value is ",
-                delta
-            )
-        });
-    }
-
-    /// Return the total memory usage for all requesters
-    pub fn get_requester_total(&self) -> usize {
-        *self.requesters_total.lock()
-    }
-
-    /// Register a new memory requester
-    pub(crate) fn register_requester(&self, requester_id: &MemoryConsumerId) {
-        self.requesters.lock().insert(requester_id.clone());
-    }
-
-    fn max_mem_for_requesters(&self) -> usize {
-        let trk_total = self.get_tracker_total();
-        self.pool_size.saturating_sub(trk_total)
-    }
-
-    /// Grow memory attempt from a consumer, return if we could grant that much to it
-    fn can_grow_directly(&self, required: usize, current: usize) -> bool {
-        let num_rqt = self.requesters.lock().len();
-        let mut rqt_current_used = self.requesters_total.lock();
-        let mut rqt_max = self.max_mem_for_requesters();
-
-        let granted;
-        loop {
-            let max_per_rqt = rqt_max / num_rqt;
-            let min_per_rqt = max_per_rqt / 2;
-
-            if required + current >= max_per_rqt {
-                granted = false;
-                break;
-            }
-
-            let remaining = rqt_max.checked_sub(*rqt_current_used).unwrap_or_default();
-            if remaining >= required {
-                granted = true;
-                *rqt_current_used += required;
-                break;
-            } else if current < min_per_rqt {
-                // if we cannot acquire at lease 1/2n memory, just wait for others
-                // to spill instead spill self frequently with limited total mem
-                debug!(
-                    "Cannot acquire a minimum amount of {} memory from the manager of total {}, waiting for others to spill ...",
-                    human_readable_size(min_per_rqt), human_readable_size(self.pool_size));
-                let now = Instant::now();
-                self.cv.wait(&mut rqt_current_used);
-                let elapsed = now.elapsed();
-                if elapsed > Duration::from_secs(10) {
-                    warn!("Elapsed on waiting for spilling: {:.2?}", elapsed);
-                }
-            } else {
-                granted = false;
-                break;
-            }
-
-            rqt_max = self.max_mem_for_requesters();
-        }
-
-        granted
-    }
-
-    fn record_free_then_acquire(&self, freed: usize, acquired: usize) {
-        let mut requesters_total = self.requesters_total.lock();
-        debug!(
-            "free_then_acquire: total {}, freed {}, acquired {}",
-            human_readable_size(*requesters_total),
-            human_readable_size(freed),
-            human_readable_size(acquired)
-        );
-        assert!(*requesters_total >= freed);
-        *requesters_total -= freed;
-        *requesters_total += acquired;
-        self.cv.notify_all();
-    }
-
-    fn record_free(&self, freed: usize) {
-        let mut requesters_total = self.requesters_total.lock();
-        debug!(
-            "free: total {}, freed {}",
-            human_readable_size(*requesters_total),
-            human_readable_size(freed)
-        );
-        assert!(*requesters_total >= freed);
-        *requesters_total -= freed;
-        self.cv.notify_all();
-    }
-
-    /// Drop a memory consumer and reclaim the memory
-    pub(crate) fn drop_consumer(&self, id: &MemoryConsumerId, mem_used: usize) {
-        // find in requesters first
-        {
-            let mut requesters = self.requesters.lock();
-            if requesters.remove(id) {
-                let mut total = self.requesters_total.lock();
-                assert!(*total >= mem_used);
-                *total -= mem_used;
-                self.cv.notify_all();
-                return;
-            }
-        }
-        self.shrink_tracker_usage(mem_used);
-        self.cv.notify_all();
-    }
-}
-
-impl Display for MemoryManager {
-    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
-        write!(f,
-               "MemoryManager usage statistics: total {}, trackers used {}, total {} requesters used: {}",
-               human_readable_size(self.pool_size),
-               human_readable_size(self.get_tracker_total()),
-               self.requesters.lock().len(),
-               human_readable_size(self.get_requester_total()),
-        )
-    }
-}
-
-const TB: u64 = 1 << 40;
-const GB: u64 = 1 << 30;
-const MB: u64 = 1 << 20;
-const KB: u64 = 1 << 10;
-
-/// Present size in human readable form
-pub fn human_readable_size(size: usize) -> String {
-    let size = size as u64;
-    let (value, unit) = {
-        if size >= 2 * TB {
-            (size as f64 / TB as f64, "TB")
-        } else if size >= 2 * GB {
-            (size as f64 / GB as f64, "GB")
-        } else if size >= 2 * MB {
-            (size as f64 / MB as f64, "MB")
-        } else if size >= 2 * KB {
-            (size as f64 / KB as f64, "KB")
-        } else {
-            (size as f64, "B")
-        }
-    };
-    format!("{:.1} {}", value, unit)
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::error::Result;
-    use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
-    use crate::execution::MemoryConsumer;
-    use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics};
-    use async_trait::async_trait;
-    use std::sync::atomic::{AtomicUsize, Ordering};
-    use std::sync::Arc;
-
-    struct DummyRequester {
-        id: MemoryConsumerId,
-        runtime: Arc<RuntimeEnv>,
-        spills: AtomicUsize,
-        mem_used: AtomicUsize,
-    }
-
-    impl DummyRequester {
-        fn new(partition: usize, runtime: Arc<RuntimeEnv>) -> Self {
-            Self {
-                id: MemoryConsumerId::new(partition),
-                runtime,
-                spills: AtomicUsize::new(0),
-                mem_used: AtomicUsize::new(0),
-            }
-        }
-
-        async fn do_with_mem(&self, grow: usize) -> Result<()> {
-            self.try_grow(grow).await?;
-            self.mem_used.fetch_add(grow, Ordering::SeqCst);
-            Ok(())
-        }
-
-        fn get_spills(&self) -> usize {
-            self.spills.load(Ordering::SeqCst)
-        }
-    }
-
-    #[async_trait]
-    impl MemoryConsumer for DummyRequester {
-        fn name(&self) -> String {
-            "dummy".to_owned()
-        }
-
-        fn id(&self) -> &MemoryConsumerId {
-            &self.id
-        }
-
-        fn memory_manager(&self) -> Arc<MemoryManager> {
-            self.runtime.memory_manager.clone()
-        }
-
-        fn type_(&self) -> &ConsumerType {
-            &ConsumerType::Requesting
-        }
-
-        async fn spill(&self) -> Result<usize> {
-            self.spills.fetch_add(1, Ordering::SeqCst);
-            let used = self.mem_used.swap(0, Ordering::SeqCst);
-            Ok(used)
-        }
-
-        fn mem_used(&self) -> usize {
-            self.mem_used.load(Ordering::SeqCst)
-        }
-    }
-
-    struct DummyTracker {
-        id: MemoryConsumerId,
-        runtime: Arc<RuntimeEnv>,
-        mem_used: usize,
-    }
-
-    impl DummyTracker {
-        fn new(partition: usize, runtime: Arc<RuntimeEnv>, mem_used: usize) -> Self {
-            runtime.grow_tracker_usage(mem_used);
-            Self {
-                id: MemoryConsumerId::new(partition),
-                runtime,
-                mem_used,
-            }
-        }
-    }
-
-    #[async_trait]
-    impl MemoryConsumer for DummyTracker {
-        fn name(&self) -> String {
-            "dummy".to_owned()
-        }
-
-        fn id(&self) -> &MemoryConsumerId {
-            &self.id
-        }
-
-        fn memory_manager(&self) -> Arc<MemoryManager> {
-            self.runtime.memory_manager.clone()
-        }
-
-        fn type_(&self) -> &ConsumerType {
-            &ConsumerType::Tracking
-        }
-
-        async fn spill(&self) -> Result<usize> {
-            Ok(0)
-        }
-
-        fn mem_used(&self) -> usize {
-            self.mem_used
-        }
-    }
-
-    #[tokio::test]
-    async fn basic_functionalities() {
-        let config = RuntimeConfig::new()
-            .with_memory_manager(MemoryManagerConfig::try_new_limit(100, 1.0).unwrap());
-        let runtime = Arc::new(RuntimeEnv::new(config).unwrap());
-
-        DummyTracker::new(0, runtime.clone(), 5);
-        assert_eq!(runtime.memory_manager.get_tracker_total(), 5);
-
-        let tracker1 = DummyTracker::new(0, runtime.clone(), 10);
-        assert_eq!(runtime.memory_manager.get_tracker_total(), 15);
-
-        DummyTracker::new(0, runtime.clone(), 15);
-        assert_eq!(runtime.memory_manager.get_tracker_total(), 30);
-
-        runtime.drop_consumer(tracker1.id(), tracker1.mem_used);
-        assert_eq!(runtime.memory_manager.get_tracker_total(), 20);
-
-        // MemTrackingMetrics as an easy way to track memory
-        let ms = ExecutionPlanMetricsSet::new();
-        let tracking_metric = MemTrackingMetrics::new_with_rt(&ms, 0, runtime.clone());
-        tracking_metric.init_mem_used(15);
-        assert_eq!(runtime.memory_manager.get_tracker_total(), 35);
-
-        drop(tracking_metric);
-        assert_eq!(runtime.memory_manager.get_tracker_total(), 20);
-
-        let requester1 = DummyRequester::new(0, runtime.clone());
-        runtime.register_requester(requester1.id());
-
-        // first requester entered, should be able to use any of the remaining 80
-        requester1.do_with_mem(40).await.unwrap();
-        requester1.do_with_mem(10).await.unwrap();
-        assert_eq!(requester1.get_spills(), 0);
-        assert_eq!(requester1.mem_used(), 50);
-        assert_eq!(*runtime.memory_manager.requesters_total.lock(), 50);
-
-        let requester2 = DummyRequester::new(0, runtime.clone());
-        runtime.register_requester(requester2.id());
-
-        requester2.do_with_mem(20).await.unwrap();
-        requester2.do_with_mem(30).await.unwrap();
-        assert_eq!(requester2.get_spills(), 1);
-        assert_eq!(requester2.mem_used(), 30);
-
-        requester1.do_with_mem(10).await.unwrap();
-        assert_eq!(requester1.get_spills(), 1);
-        assert_eq!(requester1.mem_used(), 10);
-
-        assert_eq!(*runtime.memory_manager.requesters_total.lock(), 40);
-    }
-
-    #[tokio::test]
-    #[should_panic(expected = "invalid max_memory. Expected greater than 0, got 0")]
-    async fn test_try_new_with_limit_0() {
-        MemoryManagerConfig::try_new_limit(0, 1.0).unwrap();
-    }
-
-    #[tokio::test]
-    #[should_panic(
-        expected = "invalid fraction. Expected greater than 0 and less than 1.0, got -9.6"
-    )]
-    async fn test_try_new_with_limit_neg_fraction() {
-        MemoryManagerConfig::try_new_limit(100, -9.6).unwrap();
-    }
-
-    #[tokio::test]
-    #[should_panic(
-        expected = "invalid fraction. Expected greater than 0 and less than 1.0, got 9.6"
-    )]
-    async fn test_try_new_with_limit_too_large() {
-        MemoryManagerConfig::try_new_limit(100, 9.6).unwrap();
-    }
-
-    #[tokio::test]
-    async fn test_try_new_with_limit_pool_size() {
-        let config = MemoryManagerConfig::try_new_limit(100, 0.5).unwrap();
-        assert_eq!(config.pool_size(), 50);
-
-        let config = MemoryManagerConfig::try_new_limit(100000, 0.1).unwrap();
-        assert_eq!(config.pool_size(), 10000);
-    }
-
-    #[tokio::test]
-    async fn test_memory_manager_underflow() {
-        let config = MemoryManagerConfig::try_new_limit(100, 0.5).unwrap();
-        let manager = MemoryManager::new(config);
-        manager.grow_tracker_usage(100);
-
-        manager.register_requester(&MemoryConsumerId::new(1));
-        assert!(!manager.can_grow_directly(20, 0));
-    }
-}
diff --git a/datafusion/core/src/execution/memory_pool/mod.rs b/datafusion/core/src/execution/memory_pool/mod.rs
new file mode 100644
index 000000000..6369cda4d
--- /dev/null
+++ b/datafusion/core/src/execution/memory_pool/mod.rs
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Manages all available memory during query execution
+
+use crate::error::Result;
+use std::sync::Arc;
+
+mod pool;
+pub mod proxy;
+
+pub use pool::*;
+
+/// The pool of memory on which [`MemoryReservation`] record their memory reservations
+pub trait MemoryPool: Send + Sync + std::fmt::Debug {
+    /// Registers a new [`MemoryConsumer`]
+    ///
+    /// Note: Subsequent calls to [`Self::grow`] must be made to reserve memory
+    fn register(&self, _consumer: &MemoryConsumer) {}
+
+    /// Records the destruction of a [`MemoryReservation`] with [`MemoryConsumer`]
+    ///
+    /// Note: Prior calls to [`Self::shrink`] must be made to free any reserved memory
+    fn unregister(&self, _consumer: &MemoryConsumer) {}
+
+    /// Infallibly grow the provided `reservation` by `additional` bytes
+    ///
+    /// This must always succeed
+    fn grow(&self, reservation: &MemoryReservation, additional: usize);
+
+    /// Infallibly shrink the provided `reservation` by `shrink` bytes
+    fn shrink(&self, reservation: &MemoryReservation, shrink: usize);
+
+    /// Attempt to grow the provided `reservation` by `additional` bytes
+    ///
+    /// On error the `allocation` will not be increased in size
+    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()>;
+
+    /// Return the total amount of memory reserved
+    fn reserved(&self) -> usize;
+}
+
+/// A memory consumer that can be tracked by [`MemoryReservation`] in a [`MemoryPool`]
+#[derive(Debug)]
+pub struct MemoryConsumer {
+    name: String,
+    can_spill: bool,
+}
+
+impl MemoryConsumer {
+    /// Create a new empty [`MemoryConsumer`] that can be grown using [`MemoryReservation`]
+    pub fn new(name: impl Into<String>) -> Self {
+        Self {
+            name: name.into(),
+            can_spill: false,
+        }
+    }
+
+    /// Set whether this allocation can be spilled to disk
+    pub fn with_can_spill(self, can_spill: bool) -> Self {
+        Self { can_spill, ..self }
+    }
+
+    /// Returns true if this allocation can spill to disk
+    pub fn can_spill(&self) -> bool {
+        self.can_spill
+    }
+
+    /// Returns the name associated with this allocation
+    pub fn name(&self) -> &str {
+        &self.name
+    }
+
+    /// Registers this [`MemoryConsumer`] with the provided [`MemoryPool`] returning
+    /// a [`MemoryReservation`] that can be used to grow or shrink the memory reservation
+    pub fn register(self, pool: &Arc<dyn MemoryPool>) -> MemoryReservation {
+        pool.register(&self);
+        MemoryReservation {
+            consumer: self,
+            size: 0,
+            policy: Arc::clone(pool),
+        }
+    }
+}
+
+/// A [`MemoryReservation`] tracks a reservation of memory in a [`MemoryPool`]
+/// that is freed back to the pool on drop
+#[derive(Debug)]
+pub struct MemoryReservation {
+    consumer: MemoryConsumer,
+    size: usize,
+    policy: Arc<dyn MemoryPool>,
+}
+
+impl MemoryReservation {
+    /// Returns the size of this reservation in bytes
+    pub fn size(&self) -> usize {
+        self.size
+    }
+
+    /// Frees all bytes from this reservation returning the number of bytes freed
+    pub fn free(&mut self) -> usize {
+        let size = self.size;
+        if size != 0 {
+            self.shrink(size)
+        }
+        size
+    }
+
+    /// Frees `capacity` bytes from this reservation
+    ///
+    /// # Panics
+    ///
+    /// Panics if `capacity` exceeds [`Self::size`]
+    pub fn shrink(&mut self, capacity: usize) {
+        let new_size = self.size.checked_sub(capacity).unwrap();
+        self.policy.shrink(self, capacity);
+        self.size = new_size
+    }
+
+    /// Sets the size of this reservation to `capacity`
+    pub fn resize(&mut self, capacity: usize) {
+        use std::cmp::Ordering;
+        match capacity.cmp(&self.size) {
+            Ordering::Greater => self.grow(capacity - self.size),
+            Ordering::Less => self.shrink(self.size - capacity),
+            _ => {}
+        }
+    }
+
+    /// Increase the size of this reservation by `capacity` bytes
+    pub fn grow(&mut self, capacity: usize) {
+        self.policy.grow(self, capacity);
+        self.size += capacity;
+    }
+
+    /// Try to increase the size of this reservation by `capacity` bytes
+    pub fn try_grow(&mut self, capacity: usize) -> Result<()> {
+        self.policy.try_grow(self, capacity)?;
+        self.size += capacity;
+        Ok(())
+    }
+}
+
+impl Drop for MemoryReservation {
+    fn drop(&mut self) {
+        self.free();
+        self.policy.unregister(&self.consumer);
+    }
+}
+
+const TB: u64 = 1 << 40;
+const GB: u64 = 1 << 30;
+const MB: u64 = 1 << 20;
+const KB: u64 = 1 << 10;
+
+/// Present size in human readable form
+pub fn human_readable_size(size: usize) -> String {
+    let size = size as u64;
+    let (value, unit) = {
+        if size >= 2 * TB {
+            (size as f64 / TB as f64, "TB")
+        } else if size >= 2 * GB {
+            (size as f64 / GB as f64, "GB")
+        } else if size >= 2 * MB {
+            (size as f64 / MB as f64, "MB")
+        } else if size >= 2 * KB {
+            (size as f64 / KB as f64, "KB")
+        } else {
+            (size as f64, "B")
+        }
+    };
+    format!("{:.1} {}", value, unit)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_memory_pool_underflow() {
+        let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
+        let mut a1 = MemoryConsumer::new("a1").register(&pool);
+        assert_eq!(pool.reserved(), 0);
+
+        a1.grow(100);
+        assert_eq!(pool.reserved(), 100);
+
+        assert_eq!(a1.free(), 100);
+        assert_eq!(pool.reserved(), 0);
+
+        a1.try_grow(100).unwrap_err();
+        assert_eq!(pool.reserved(), 0);
+
+        a1.try_grow(30).unwrap();
+        assert_eq!(pool.reserved(), 30);
+
+        let mut a2 = MemoryConsumer::new("a2").register(&pool);
+        a2.try_grow(25).unwrap_err();
+        assert_eq!(pool.reserved(), 30);
+
+        drop(a1);
+        assert_eq!(pool.reserved(), 0);
+
+        a2.try_grow(25).unwrap();
+        assert_eq!(pool.reserved(), 25);
+    }
+}
diff --git a/datafusion/core/src/execution/memory_pool/pool.rs b/datafusion/core/src/execution/memory_pool/pool.rs
new file mode 100644
index 000000000..5d28629be
--- /dev/null
+++ b/datafusion/core/src/execution/memory_pool/pool.rs
@@ -0,0 +1,285 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
+use datafusion_common::{DataFusionError, Result};
+use parking_lot::Mutex;
+use std::sync::atomic::{AtomicUsize, Ordering};
+
+/// A [`MemoryPool`] that enforces no limit
+#[derive(Debug, Default)]
+pub struct UnboundedMemoryPool {
+    used: AtomicUsize,
+}
+
+impl MemoryPool for UnboundedMemoryPool {
+    fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
+        self.used.fetch_add(additional, Ordering::Relaxed);
+    }
+
+    fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
+        self.used.fetch_sub(shrink, Ordering::Relaxed);
+    }
+
+    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
+        self.grow(reservation, additional);
+        Ok(())
+    }
+
+    fn reserved(&self) -> usize {
+        self.used.load(Ordering::Relaxed)
+    }
+}
+
+/// A [`MemoryPool`] that implements a greedy first-come first-serve limit
+#[derive(Debug)]
+pub struct GreedyMemoryPool {
+    pool_size: usize,
+    used: AtomicUsize,
+}
+
+impl GreedyMemoryPool {
+    /// Allocate up to `limit` bytes
+    pub fn new(pool_size: usize) -> Self {
+        Self {
+            pool_size,
+            used: AtomicUsize::new(0),
+        }
+    }
+}
+
+impl MemoryPool for GreedyMemoryPool {
+    fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
+        self.used.fetch_add(additional, Ordering::Relaxed);
+    }
+
+    fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
+        self.used.fetch_sub(shrink, Ordering::Relaxed);
+    }
+
+    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
+        self.used
+            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| {
+                let new_used = used + additional;
+                (new_used <= self.pool_size).then_some(new_used)
+            })
+            .map_err(|used| {
+                insufficient_capacity_err(reservation, additional, self.pool_size - used)
+            })?;
+        Ok(())
+    }
+
+    fn reserved(&self) -> usize {
+        self.used.load(Ordering::Relaxed)
+    }
+}
+
+/// A [`MemoryPool`] that prevents spillable reservations from using more than
+/// an even fraction of the available memory sans any unspillable reservations
+/// (i.e. `(pool_size - unspillable_memory) / num_spillable_reservations`)
+///
+///    ┌───────────────────────z──────────────────────z───────────────┐
+///    │                       z                      z               │
+///    │                       z                      z               │
+///    │       Spillable       z       Unspillable    z     Free      │
+///    │        Memory         z        Memory        z    Memory     │
+///    │                       z                      z               │
+///    │                       z                      z               │
+///    └───────────────────────z──────────────────────z───────────────┘
+///
+/// Unspillable memory is allocated in a first-come, first-serve fashion
+#[derive(Debug)]
+pub struct FairSpillPool {
+    /// The total memory limit
+    pool_size: usize,
+
+    state: Mutex<FairSpillPoolState>,
+}
+
+#[derive(Debug)]
+struct FairSpillPoolState {
+    /// The number of consumers that can spill
+    num_spill: usize,
+
+    /// The total amount of memory reserved that can be spilled
+    spillable: usize,
+
+    /// The total amount of memory reserved by consumers that cannot spill
+    unspillable: usize,
+}
+
+impl FairSpillPool {
+    /// Allocate up to `limit` bytes
+    pub fn new(pool_size: usize) -> Self {
+        Self {
+            pool_size,
+            state: Mutex::new(FairSpillPoolState {
+                num_spill: 0,
+                spillable: 0,
+                unspillable: 0,
+            }),
+        }
+    }
+}
+
+impl MemoryPool for FairSpillPool {
+    fn register(&self, consumer: &MemoryConsumer) {
+        if consumer.can_spill {
+            self.state.lock().num_spill += 1;
+        }
+    }
+
+    fn unregister(&self, consumer: &MemoryConsumer) {
+        if consumer.can_spill {
+            self.state.lock().num_spill -= 1;
+        }
+    }
+
+    fn grow(&self, reservation: &MemoryReservation, additional: usize) {
+        let mut state = self.state.lock();
+        match reservation.consumer.can_spill {
+            true => state.spillable += additional,
+            false => state.unspillable += additional,
+        }
+    }
+
+    fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
+        let mut state = self.state.lock();
+        match reservation.consumer.can_spill {
+            true => state.spillable -= shrink,
+            false => state.unspillable -= shrink,
+        }
+    }
+
+    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
+        let mut state = self.state.lock();
+
+        match reservation.consumer.can_spill {
+            true => {
+                // The total amount of memory available to spilling consumers
+                let spill_available = self.pool_size.saturating_sub(state.unspillable);
+
+                // No spiller may use more than their fraction of the memory available
+                let available = spill_available
+                    .checked_div(state.num_spill)
+                    .unwrap_or(spill_available);
+
+                if reservation.size + additional > available {
+                    return Err(insufficient_capacity_err(
+                        reservation,
+                        additional,
+                        available,
+                    ));
+                }
+                state.spillable += additional;
+            }
+            false => {
+                let available = self
+                    .pool_size
+                    .saturating_sub(state.unspillable + state.unspillable);
+
+                if available < additional {
+                    return Err(insufficient_capacity_err(
+                        reservation,
+                        additional,
+                        available,
+                    ));
+                }
+                state.unspillable += additional;
+            }
+        }
+        Ok(())
+    }
+
+    fn reserved(&self) -> usize {
+        let state = self.state.lock();
+        state.spillable + state.unspillable
+    }
+}
+
+fn insufficient_capacity_err(
+    reservation: &MemoryReservation,
+    additional: usize,
+    available: usize,
+) -> DataFusionError {
+    DataFusionError::ResourcesExhausted(format!("Failed to allocate additional {} bytes for {} with {} bytes already allocated - maximum available is {}", additional, reservation.consumer.name, reservation.size, available))
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use std::sync::Arc;
+
+    #[test]
+    fn test_fair() {
+        let pool = Arc::new(FairSpillPool::new(100)) as _;
+
+        let mut r1 = MemoryConsumer::new("unspillable").register(&pool);
+        // Can grow beyond capacity of pool
+        r1.grow(2000);
+        assert_eq!(pool.reserved(), 2000);
+
+        let mut r2 = MemoryConsumer::new("s1")
+            .with_can_spill(true)
+            .register(&pool);
+        // Can grow beyond capacity of pool
+        r2.grow(2000);
+
+        assert_eq!(pool.reserved(), 4000);
+
+        let err = r2.try_grow(1).unwrap_err().to_string();
+        assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for s1 with 2000 bytes already allocated - maximum available is 0");
+
+        let err = r2.try_grow(1).unwrap_err().to_string();
+        assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for s1 with 2000 bytes already allocated - maximum available is 0");
+
+        r1.shrink(1990);
+        r2.shrink(2000);
+
+        assert_eq!(pool.reserved(), 10);
+
+        r1.try_grow(10).unwrap();
+        assert_eq!(pool.reserved(), 20);
+
+        // Can grow a2 to 80 as only spilling consumer
+        r2.try_grow(80).unwrap();
+        assert_eq!(pool.reserved(), 100);
+
+        r2.shrink(70);
+
+        assert_eq!(r1.size(), 20);
+        assert_eq!(r2.size(), 10);
+        assert_eq!(pool.reserved(), 30);
+
+        let mut r3 = MemoryConsumer::new("s2")
+            .with_can_spill(true)
+            .register(&pool);
+
+        let err = r3.try_grow(70).unwrap_err().to_string();
+        assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for s2 with 0 bytes already allocated - maximum available is 40");
+
+        //Shrinking a2 to zero doesn't allow a3 to allocate more than 45
+        r2.free();
+        let err = r3.try_grow(70).unwrap_err().to_string();
+        assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for s2 with 0 bytes already allocated - maximum available is 40");
+
+        // But dropping a2 does
+        drop(r2);
+        assert_eq!(pool.reserved(), 20);
+        r3.try_grow(80).unwrap();
+    }
+}
diff --git a/datafusion/core/src/execution/memory_manager/proxy.rs b/datafusion/core/src/execution/memory_pool/proxy.rs
similarity index 60%
rename from datafusion/core/src/execution/memory_manager/proxy.rs
rename to datafusion/core/src/execution/memory_pool/proxy.rs
index 2a5bd2507..43532f9a8 100644
--- a/datafusion/core/src/execution/memory_manager/proxy.rs
+++ b/datafusion/core/src/execution/memory_pool/proxy.rs
@@ -16,96 +16,9 @@
 // under the License.
 
 //! Utilities that help with tracking of memory allocations.
-use std::sync::Arc;
 
-use async_trait::async_trait;
-use datafusion_common::DataFusionError;
 use hashbrown::raw::{Bucket, RawTable};
 
-use super::{ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager};
-
-/// Accounting proxy for memory usage.
-///
-/// This is helpful when calculating memory usage on the actual data structure is expensive but it is easy to track
-/// allocations while processing data.
-///
-/// This consumer will NEVER spill.
-pub struct MemoryConsumerProxy {
-    /// Name
-    name: String,
-
-    /// Consumer ID.
-    id: MemoryConsumerId,
-
-    /// Linked memory manager.
-    memory_manager: Arc<MemoryManager>,
-
-    /// Currently used size in bytes.
-    used: usize,
-}
-
-impl MemoryConsumerProxy {
-    /// Create new proxy consumer and register it with the given memory manager.
-    pub fn new(
-        name: impl Into<String>,
-        id: MemoryConsumerId,
-        memory_manager: Arc<MemoryManager>,
-    ) -> Self {
-        memory_manager.register_requester(&id);
-
-        Self {
-            name: name.into(),
-            id,
-            memory_manager,
-            used: 0,
-        }
-    }
-
-    /// Try to allocate given amount of memory.
-    pub async fn alloc(&mut self, bytes: usize) -> Result<(), DataFusionError> {
-        self.try_grow(bytes).await?;
-        self.used = self.used.checked_add(bytes).expect("overflow");
-        Ok(())
-    }
-}
-
-#[async_trait]
-impl MemoryConsumer for MemoryConsumerProxy {
-    fn name(&self) -> String {
-        self.name.clone()
-    }
-
-    fn id(&self) -> &crate::execution::MemoryConsumerId {
-        &self.id
-    }
-
-    fn memory_manager(&self) -> Arc<MemoryManager> {
-        Arc::clone(&self.memory_manager)
-    }
-
-    fn type_(&self) -> &ConsumerType {
-        &ConsumerType::Tracking
-    }
-
-    async fn spill(&self) -> Result<usize, DataFusionError> {
-        Err(DataFusionError::ResourcesExhausted(format!(
-            "Cannot spill {}",
-            self.name
-        )))
-    }
-
-    fn mem_used(&self) -> usize {
-        self.used
-    }
-}
-
-impl Drop for MemoryConsumerProxy {
-    fn drop(&mut self) {
-        self.memory_manager
-            .drop_consumer(self.id(), self.mem_used());
-    }
-}
-
 /// Extension trait for [`Vec`] to account for allocations.
 pub trait VecAllocExt {
     /// Item type.
diff --git a/datafusion/core/src/execution/mod.rs b/datafusion/core/src/execution/mod.rs
index 024980dee..5eb859df9 100644
--- a/datafusion/core/src/execution/mod.rs
+++ b/datafusion/core/src/execution/mod.rs
@@ -42,13 +42,10 @@
 
 pub mod context;
 pub mod disk_manager;
-pub mod memory_manager;
+pub mod memory_pool;
 pub mod options;
 pub mod registry;
 pub mod runtime_env;
 
 pub use disk_manager::DiskManager;
-pub use memory_manager::{
-    human_readable_size, MemoryConsumer, MemoryConsumerId, MemoryManager,
-};
 pub use registry::FunctionRegistry;
diff --git a/datafusion/core/src/execution/runtime_env.rs b/datafusion/core/src/execution/runtime_env.rs
index 64da4a103..d559e7c7f 100644
--- a/datafusion/core/src/execution/runtime_env.rs
+++ b/datafusion/core/src/execution/runtime_env.rs
@@ -20,16 +20,14 @@
 
 use crate::{
     error::Result,
-    execution::{
-        disk_manager::{DiskManager, DiskManagerConfig},
-        memory_manager::{MemoryConsumerId, MemoryManager, MemoryManagerConfig},
-    },
+    execution::disk_manager::{DiskManager, DiskManagerConfig},
 };
 use std::collections::HashMap;
 
 use crate::datasource::datasource::TableProviderFactory;
 use crate::datasource::listing_table_factory::ListingTableFactory;
 use crate::datasource::object_store::ObjectStoreRegistry;
+use crate::execution::memory_pool::{GreedyMemoryPool, MemoryPool, UnboundedMemoryPool};
 use datafusion_common::DataFusionError;
 use object_store::ObjectStore;
 use std::fmt::{Debug, Formatter};
@@ -41,7 +39,7 @@ use url::Url;
 /// Execution runtime environment.
 pub struct RuntimeEnv {
     /// Runtime memory management
-    pub memory_manager: Arc<MemoryManager>,
+    pub memory_pool: Arc<dyn MemoryPool>,
     /// Manage temporary files during query execution
     pub disk_manager: Arc<DiskManager>,
     /// Object Store Registry
@@ -60,40 +58,23 @@ impl RuntimeEnv {
     /// Create env based on configuration
     pub fn new(config: RuntimeConfig) -> Result<Self> {
         let RuntimeConfig {
-            memory_manager,
+            memory_pool,
             disk_manager,
             object_store_registry,
             table_factories,
         } = config;
 
+        let memory_pool =
+            memory_pool.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));
+
         Ok(Self {
-            memory_manager: MemoryManager::new(memory_manager),
+            memory_pool,
             disk_manager: DiskManager::try_new(disk_manager)?,
             object_store_registry,
             table_factories,
         })
     }
 
-    /// Register the consumer to get it tracked
-    pub fn register_requester(&self, id: &MemoryConsumerId) {
-        self.memory_manager.register_requester(id);
-    }
-
-    /// Drop the consumer from get tracked, reclaim memory
-    pub fn drop_consumer(&self, id: &MemoryConsumerId, mem_used: usize) {
-        self.memory_manager.drop_consumer(id, mem_used)
-    }
-
-    /// Grow tracker memory of `delta`
-    pub fn grow_tracker_usage(&self, delta: usize) {
-        self.memory_manager.grow_tracker_usage(delta)
-    }
-
-    /// Shrink tracker memory of `delta`
-    pub fn shrink_tracker_usage(&self, delta: usize) {
-        self.memory_manager.shrink_tracker_usage(delta)
-    }
-
     /// Registers a custom `ObjectStore` to be used when accessing a
     /// specific scheme and host. This allows DataFusion to create
     /// external tables from urls that do not have built in support
@@ -142,8 +123,10 @@ impl Default for RuntimeEnv {
 pub struct RuntimeConfig {
     /// DiskManager to manage temporary disk file usage
     pub disk_manager: DiskManagerConfig,
-    /// MemoryManager to limit access to memory
-    pub memory_manager: MemoryManagerConfig,
+    /// [`MemoryPool`] from which to allocate memory
+    ///
+    /// Defaults to using an [`UnboundedMemoryPool`] if `None`
+    pub memory_pool: Option<Arc<dyn MemoryPool>>,
     /// ObjectStoreRegistry to get object store based on url
     pub object_store_registry: Arc<ObjectStoreRegistry>,
     /// Custom table factories for things like deltalake that are not part of core datafusion
@@ -172,9 +155,9 @@ impl RuntimeConfig {
         self
     }
 
-    /// Customize memory manager
-    pub fn with_memory_manager(mut self, memory_manager: MemoryManagerConfig) -> Self {
-        self.memory_manager = memory_manager;
+    /// Customize memory policy
+    pub fn with_memory_pool(mut self, memory_pool: Arc<dyn MemoryPool>) -> Self {
+        self.memory_pool = Some(memory_pool);
         self
     }
 
@@ -199,11 +182,12 @@ impl RuntimeConfig {
     /// Specify the total memory to use while running the DataFusion
     /// plan to `max_memory * memory_fraction` in bytes.
     ///
+    /// This defaults to using [`GreedyMemoryPool`]
+    ///
     /// Note DataFusion does not yet respect this limit in all cases.
     pub fn with_memory_limit(self, max_memory: usize, memory_fraction: f64) -> Self {
-        self.with_memory_manager(
-            MemoryManagerConfig::try_new_limit(max_memory, memory_fraction).unwrap(),
-        )
+        let pool_size = (max_memory as f64 * memory_fraction) as usize;
+        self.with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
     }
 
     /// Use the specified path to create any needed temporary files
diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs
index 30bf6dc7e..09b6c6691 100644
--- a/datafusion/core/src/lib.rs
+++ b/datafusion/core/src/lib.rs
@@ -212,6 +212,7 @@
 /// DataFusion crate version
 pub const DATAFUSION_VERSION: &str = env!("CARGO_PKG_VERSION");
 
+extern crate core;
 extern crate sqlparser;
 
 pub mod avro_to_arrow;
diff --git a/datafusion/core/src/physical_plan/aggregates/hash.rs b/datafusion/core/src/physical_plan/aggregates/hash.rs
index 4d1933080..64b21ecf9 100644
--- a/datafusion/core/src/physical_plan/aggregates/hash.rs
+++ b/datafusion/core/src/physical_plan/aggregates/hash.rs
@@ -29,10 +29,7 @@ use futures::stream::{Stream, StreamExt};
 
 use crate::error::Result;
 use crate::execution::context::TaskContext;
-use crate::execution::memory_manager::proxy::{
-    MemoryConsumerProxy, RawTableAllocExt, VecAllocExt,
-};
-use crate::execution::MemoryConsumerId;
+use crate::execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
 use crate::physical_plan::aggregates::{
     evaluate_group_by, evaluate_many, AccumulatorItem, AggregateMode, PhysicalGroupBy,
 };
@@ -42,6 +39,7 @@ use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
 use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
 use crate::scalar::ScalarValue;
 
+use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
 use arrow::{array::ArrayRef, compute, compute::cast};
 use arrow::{
     array::{Array, UInt32Builder},
@@ -126,6 +124,10 @@ impl GroupedHashAggregateStream {
 
         timer.done();
 
+        let reservation =
+            MemoryConsumer::new(format!("GroupedHashAggregateStream[{}]", partition))
+                .register(context.memory_pool());
+
         let inner = GroupedHashAggregateStreamInner {
             schema: Arc::clone(&schema),
             mode,
@@ -135,11 +137,7 @@ impl GroupedHashAggregateStream {
             baseline_metrics,
             aggregate_expressions,
             accumulators: Some(Accumulators {
-                memory_consumer: MemoryConsumerProxy::new(
-                    "GroupBy Hash Accumulators",
-                    MemoryConsumerId::new(partition),
-                    Arc::clone(&context.runtime_env().memory_manager),
-                ),
+                reservation,
                 map: RawTable::with_capacity(0),
                 group_states: Vec::with_capacity(0),
             }),
@@ -175,15 +173,10 @@ impl GroupedHashAggregateStream {
                         // allocate memory
                         // This happens AFTER we actually used the memory, but simplifies the whole accounting and we are OK with
                         // overshooting a bit. Also this means we either store the whole record batch or not.
-                        let result = match result {
-                            Ok(allocated) => {
-                                accumulators.memory_consumer.alloc(allocated).await
-                            }
-                            Err(e) => Err(e),
-                        };
-
-                        match result {
-                            Ok(()) => continue,
+                        match result.and_then(|allocated| {
+                            accumulators.reservation.try_grow(allocated)
+                        }) {
+                            Ok(_) => continue,
                             Err(e) => Err(ArrowError::ExternalError(Box::new(e))),
                         }
                     }
@@ -445,7 +438,7 @@ struct GroupState {
 
 /// The state of all the groups
 struct Accumulators {
-    memory_consumer: MemoryConsumerProxy,
+    reservation: MemoryReservation,
 
     /// Logically maps group values to an index in `group_states`
     ///
diff --git a/datafusion/core/src/physical_plan/aggregates/no_grouping.rs b/datafusion/core/src/physical_plan/aggregates/no_grouping.rs
index 64cc4f569..8a312abaf 100644
--- a/datafusion/core/src/physical_plan/aggregates/no_grouping.rs
+++ b/datafusion/core/src/physical_plan/aggregates/no_grouping.rs
@@ -18,8 +18,6 @@
 //! Aggregate without grouping columns
 
 use crate::execution::context::TaskContext;
-use crate::execution::memory_manager::proxy::MemoryConsumerProxy;
-use crate::execution::MemoryConsumerId;
 use crate::physical_plan::aggregates::{
     aggregate_expressions, create_accumulators, finalize_aggregation, AccumulatorItem,
     AggregateMode,
@@ -35,6 +33,7 @@ use futures::stream::BoxStream;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 
+use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
 use futures::stream::{Stream, StreamExt};
 
 /// stream struct for aggregation without grouping columns
@@ -55,7 +54,7 @@ struct AggregateStreamInner {
     baseline_metrics: BaselineMetrics,
     aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
     accumulators: Vec<AccumulatorItem>,
-    memory_consumer: MemoryConsumerProxy,
+    reservation: MemoryReservation,
     finished: bool,
 }
 
@@ -69,14 +68,12 @@ impl AggregateStream {
         baseline_metrics: BaselineMetrics,
         context: Arc<TaskContext>,
         partition: usize,
-    ) -> datafusion_common::Result<Self> {
+    ) -> Result<Self> {
         let aggregate_expressions = aggregate_expressions(&aggr_expr, &mode, 0)?;
         let accumulators = create_accumulators(&aggr_expr)?;
-        let memory_consumer = MemoryConsumerProxy::new(
-            "GroupBy None Accumulators",
-            MemoryConsumerId::new(partition),
-            Arc::clone(&context.runtime_env().memory_manager),
-        );
+
+        let reservation = MemoryConsumer::new(format!("AggregateStream[{}]", partition))
+            .register(context.memory_pool());
 
         let inner = AggregateStreamInner {
             schema: Arc::clone(&schema),
@@ -85,7 +82,7 @@ impl AggregateStream {
             baseline_metrics,
             aggregate_expressions,
             accumulators,
-            memory_consumer,
+            reservation,
             finished: false,
         };
         let stream = futures::stream::unfold(inner, |mut this| async move {
@@ -111,12 +108,9 @@ impl AggregateStream {
                         // allocate memory
                         // This happens AFTER we actually used the memory, but simplifies the whole accounting and we are OK with
                         // overshooting a bit. Also this means we either store the whole record batch or not.
-                        let result = match result {
-                            Ok(allocated) => this.memory_consumer.alloc(allocated).await,
-                            Err(e) => Err(e),
-                        };
-
-                        match result {
+                        match result
+                            .and_then(|allocated| this.reservation.try_grow(allocated))
+                        {
                             Ok(_) => continue,
                             Err(e) => Err(ArrowError::ExternalError(Box::new(e))),
                         }
diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
index c73fa3da0..e76939787 100644
--- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs
+++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
@@ -27,10 +27,7 @@ use futures::stream::{Stream, StreamExt};
 
 use crate::error::Result;
 use crate::execution::context::TaskContext;
-use crate::execution::memory_manager::proxy::{
-    MemoryConsumerProxy, RawTableAllocExt, VecAllocExt,
-};
-use crate::execution::MemoryConsumerId;
+use crate::execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
 use crate::physical_plan::aggregates::{
     evaluate_group_by, evaluate_many, group_schema, AccumulatorItemV2, AggregateMode,
     PhysicalGroupBy,
@@ -40,6 +37,7 @@ use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
 use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
 use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
 
+use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
 use arrow::compute::cast;
 use arrow::datatypes::Schema;
 use arrow::{array::ArrayRef, compute};
@@ -141,13 +139,12 @@ impl GroupedHashAggregateStreamV2 {
         let aggr_schema = aggr_state_schema(&aggr_expr)?;
 
         let aggr_layout = Arc::new(RowLayout::new(&aggr_schema, RowType::WordAligned));
+        let reservation =
+            MemoryConsumer::new(format!("GroupedHashAggregateStreamV2[{}]", partition))
+                .register(context.memory_pool());
 
         let aggr_state = AggregationState {
-            memory_consumer: MemoryConsumerProxy::new(
-                "GroupBy Hash (Row) AggregationState",
-                MemoryConsumerId::new(partition),
-                Arc::clone(&context.runtime_env().memory_manager),
-            ),
+            reservation,
             map: RawTable::with_capacity(0),
             group_states: Vec::with_capacity(0),
         };
@@ -196,15 +193,10 @@ impl GroupedHashAggregateStreamV2 {
                             // allocate memory
                             // This happens AFTER we actually used the memory, but simplifies the whole accounting and we are OK with
                             // overshooting a bit. Also this means we either store the whole record batch or not.
-                            let result = match result {
-                                Ok(allocated) => {
-                                    this.aggr_state.memory_consumer.alloc(allocated).await
-                                }
-                                Err(e) => Err(e),
-                            };
-
-                            match result {
-                                Ok(()) => continue,
+                            match result.and_then(|allocated| {
+                                this.aggr_state.reservation.try_grow(allocated)
+                            }) {
+                                Ok(_) => continue,
                                 Err(e) => Err(ArrowError::ExternalError(Box::new(e))),
                             }
                         }
@@ -465,7 +457,7 @@ struct RowGroupState {
 
 /// The state of all the groups
 struct AggregationState {
-    memory_consumer: MemoryConsumerProxy,
+    reservation: MemoryReservation,
 
     /// Logically maps group values to an index in `group_states`
     ///
diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs
index b4db3a32b..b29dc0cb8 100644
--- a/datafusion/core/src/physical_plan/common.rs
+++ b/datafusion/core/src/physical_plan/common.rs
@@ -51,7 +51,7 @@ impl SizedRecordBatchStream {
     pub fn new(
         schema: SchemaRef,
         batches: Vec<Arc<RecordBatch>>,
-        metrics: MemTrackingMetrics,
+        mut metrics: MemTrackingMetrics,
     ) -> Self {
         let size = batches.iter().map(|b| batch_byte_size(b)).sum::<usize>();
         metrics.init_mem_used(size);
diff --git a/datafusion/core/src/physical_plan/explain.rs b/datafusion/core/src/physical_plan/explain.rs
index ac350b183..077ed8dcc 100644
--- a/datafusion/core/src/physical_plan/explain.rs
+++ b/datafusion/core/src/physical_plan/explain.rs
@@ -152,7 +152,8 @@ impl ExecutionPlan for ExplainExec {
         )?;
 
         let metrics = ExecutionPlanMetricsSet::new();
-        let tracking_metrics = MemTrackingMetrics::new(&metrics, partition);
+        let tracking_metrics =
+            MemTrackingMetrics::new(&metrics, context.memory_pool(), partition);
 
         debug!(
             "Before returning SizedRecordBatch in ExplainExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
diff --git a/datafusion/core/src/physical_plan/metrics/composite.rs b/datafusion/core/src/physical_plan/metrics/composite.rs
index cd4d5c38a..3c257805d 100644
--- a/datafusion/core/src/physical_plan/metrics/composite.rs
+++ b/datafusion/core/src/physical_plan/metrics/composite.rs
@@ -17,7 +17,7 @@
 
 //! Metrics common for complex operators with multiple steps.
 
-use crate::execution::runtime_env::RuntimeEnv;
+use crate::execution::memory_pool::MemoryPool;
 use crate::physical_plan::metrics::tracker::MemTrackingMetrics;
 use crate::physical_plan::metrics::{
     BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricValue, MetricsSet, Time,
@@ -32,7 +32,7 @@ use std::time::Duration;
 /// Collects all metrics during a complex operation, which is composed of multiple steps and
 /// each stage reports its statistics separately.
 /// Give sort as an example, when the dataset is more significant than available memory, it will report
-/// multiple in-mem sort metrics and final merge-sort  metrics from `SortPreservingMergeStream`.
+/// multiple in-mem sort metrics and final merge-sort metrics from `SortPreservingMergeStream`.
 /// Therefore, We need a separation of metrics for which are final metrics (for output_rows accumulation),
 /// and which are intermediate metrics that we only account for elapsed_compute time.
 pub struct CompositeMetricsSet {
@@ -69,18 +69,18 @@ impl CompositeMetricsSet {
     pub fn new_intermediate_tracking(
         &self,
         partition: usize,
-        runtime: Arc<RuntimeEnv>,
+        pool: &Arc<dyn MemoryPool>,
     ) -> MemTrackingMetrics {
-        MemTrackingMetrics::new_with_rt(&self.mid, partition, runtime)
+        MemTrackingMetrics::new(&self.mid, pool, partition)
     }
 
     /// create a new final memory tracking metrics
     pub fn new_final_tracking(
         &self,
         partition: usize,
-        runtime: Arc<RuntimeEnv>,
+        pool: &Arc<dyn MemoryPool>,
     ) -> MemTrackingMetrics {
-        MemTrackingMetrics::new_with_rt(&self.final_, partition, runtime)
+        MemTrackingMetrics::new(&self.final_, pool, partition)
     }
 
     fn merge_compute_time(&self, dest: &Time) {
diff --git a/datafusion/core/src/physical_plan/metrics/tracker.rs b/datafusion/core/src/physical_plan/metrics/tracker.rs
index d8017b95a..c61398c65 100644
--- a/datafusion/core/src/physical_plan/metrics/tracker.rs
+++ b/datafusion/core/src/physical_plan/metrics/tracker.rs
@@ -17,51 +17,37 @@
 
 //! Metrics with memory usage tracking capability
 
-use crate::execution::runtime_env::RuntimeEnv;
-use crate::execution::MemoryConsumerId;
 use crate::physical_plan::metrics::{
     BaselineMetrics, Count, ExecutionPlanMetricsSet, Time,
 };
 use std::sync::Arc;
 use std::task::Poll;
 
+use crate::execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
 use arrow::{error::ArrowError, record_batch::RecordBatch};
 
-/// Simplified version of tracking memory consumer,
-/// see also: [`Tracking`](crate::execution::memory_manager::ConsumerType::Tracking)
-///
-/// You could use this to replace [BaselineMetrics], report the memory,
-/// and get the memory usage bookkeeping in the memory manager easily.
+/// Wraps a [`BaselineMetrics`] and records memory usage on a [`MemoryReservation`]
 #[derive(Debug)]
 pub struct MemTrackingMetrics {
-    id: MemoryConsumerId,
-    runtime: Option<Arc<RuntimeEnv>>,
+    reservation: MemoryReservation,
     metrics: BaselineMetrics,
 }
 
 /// Delegates most of the metrics functionalities to the inner BaselineMetrics,
 /// intercept memory metrics functionalities and do memory manager bookkeeping.
 impl MemTrackingMetrics {
-    /// Create metrics similar to [BaselineMetrics]
-    pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
-        let id = MemoryConsumerId::new(partition);
-        Self {
-            id,
-            runtime: None,
-            metrics: BaselineMetrics::new(metrics, partition),
-        }
-    }
-
-    /// Create memory tracking metrics with reference to runtime
-    pub fn new_with_rt(
+    /// Create memory tracking metrics with reference to memory manager
+    pub fn new(
         metrics: &ExecutionPlanMetricsSet,
+        pool: &Arc<dyn MemoryPool>,
         partition: usize,
-        runtime: Arc<RuntimeEnv>,
     ) -> Self {
-        let id = MemoryConsumerId::new(partition);
+        let reservation =
+            MemoryConsumer::new(format!("MemTrackingMetrics[{}]", partition))
+                .register(pool);
+
         Self {
-            id,
-            runtime: Some(runtime),
+            reservation,
             metrics: BaselineMetrics::new(metrics, partition),
         }
     }
@@ -77,11 +63,9 @@ impl MemTrackingMetrics {
     }
 
     /// setup initial memory usage and register it with memory manager
-    pub fn init_mem_used(&self, size: usize) {
+    pub fn init_mem_used(&mut self, size: usize) {
         self.metrics.mem_used().set(size);
-        if let Some(rt) = self.runtime.as_ref() {
-            rt.memory_manager.grow_tracker_usage(size);
-        }
+        self.reservation.resize(size)
     }
 
     /// return the metric for the total number of output rows produced
@@ -118,14 +102,3 @@ impl MemTrackingMetrics {
         self.metrics.record_poll(poll)
     }
 }
-
-impl Drop for MemTrackingMetrics {
-    fn drop(&mut self) {
-        self.metrics.try_done();
-        if self.mem_used() != 0 {
-            if let Some(rt) = self.runtime.as_ref() {
-                rt.drop_consumer(&self.id, self.mem_used());
-            }
-        }
-    }
-}
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs
index b6c37d109..85eca5450 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -21,8 +21,8 @@
 
 use crate::error::{DataFusionError, Result};
 use crate::execution::context::TaskContext;
-use crate::execution::memory_manager::{
-    human_readable_size, ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager,
+use crate::execution::memory_pool::{
+    human_readable_size, MemoryConsumer, MemoryReservation,
 };
 use crate::execution::runtime_env::RuntimeEnv;
 use crate::physical_plan::common::{batch_byte_size, IPCWriter, SizedRecordBatchStream};
@@ -45,9 +45,7 @@ use arrow::datatypes::SchemaRef;
 use arrow::error::{ArrowError, Result as ArrowResult};
 use arrow::ipc::reader::FileReader;
 use arrow::record_batch::RecordBatch;
-use async_trait::async_trait;
 use datafusion_physical_expr::EquivalenceProperties;
-use futures::lock::Mutex;
 use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt};
 use log::{debug, error};
 use std::any::Any;
@@ -73,10 +71,9 @@ use tokio::task;
 ///     buffer the batch in memory, go to 1.
 /// 3. when input is exhausted, merge all in memory batches and spills to get a total order.
 struct ExternalSorter {
-    id: MemoryConsumerId,
     schema: SchemaRef,
-    in_mem_batches: Mutex<Vec<BatchWithSortArray>>,
-    spills: Mutex<Vec<NamedTempFile>>,
+    in_mem_batches: Vec<BatchWithSortArray>,
+    spills: Vec<NamedTempFile>,
     /// Sort expressions
     expr: Vec<PhysicalSortExpr>,
     session_config: Arc<SessionConfig>,
@@ -84,6 +81,8 @@ struct ExternalSorter {
     metrics_set: CompositeMetricsSet,
     metrics: BaselineMetrics,
     fetch: Option<usize>,
+    reservation: MemoryReservation,
+    partition_id: usize,
 }
 
 impl ExternalSorter {
@@ -97,31 +96,40 @@ impl ExternalSorter {
         fetch: Option<usize>,
     ) -> Self {
         let metrics = metrics_set.new_intermediate_baseline(partition_id);
+
+        let reservation =
+            MemoryConsumer::new(format!("ExternalSorter[{}]", partition_id))
+                .with_can_spill(true)
+                .register(&runtime.memory_pool);
+
         Self {
-            id: MemoryConsumerId::new(partition_id),
             schema,
-            in_mem_batches: Mutex::new(vec![]),
-            spills: Mutex::new(vec![]),
+            in_mem_batches: vec![],
+            spills: vec![],
             expr,
             session_config,
             runtime,
             metrics_set,
             metrics,
             fetch,
+            reservation,
+            partition_id,
         }
     }
 
     async fn insert_batch(
-        &self,
+        &mut self,
         input: RecordBatch,
         tracking_metrics: &MemTrackingMetrics,
     ) -> Result<()> {
         if input.num_rows() > 0 {
             let size = batch_byte_size(&input);
-            debug!("Inserting {} rows of {} bytes", input.num_rows(), size);
-            self.try_grow(size).await?;
+            if self.reservation.try_grow(size).is_err() {
+                self.spill().await?;
+                self.reservation.try_grow(size)?
+            }
+
             self.metrics.mem_used().add(size);
-            let mut in_mem_batches = self.in_mem_batches.lock().await;
             // NB timer records time taken on drop, so there are no
             // calls to `timer.done()` below.
             let _timer = tracking_metrics.elapsed_compute().timer();
@@ -136,61 +144,56 @@ impl ExternalSorter {
                     // We don't have to call try_grow here, since we have already used the
                     // memory (so spilling right here wouldn't help at all for the current
                     // operation). But we still have to record it so that other requesters
-                    // would know about this unexpected increase in memory consuption.
+                    // would know about this unexpected increase in memory consumption.
                     let new_size_delta = new_size - size;
-                    self.grow(new_size_delta);
+                    self.reservation.grow(new_size_delta);
                     self.metrics.mem_used().add(new_size_delta);
                 }
                 Ordering::Less => {
                     let size_delta = size - new_size;
-                    self.shrink(size_delta);
+                    self.reservation.shrink(size_delta);
                     self.metrics.mem_used().sub(size_delta);
                 }
                 Ordering::Equal => {}
             }
-            in_mem_batches.push(partial);
+            self.in_mem_batches.push(partial);
         }
         Ok(())
     }
 
-    async fn spilled_before(&self) -> bool {
-        let spills = self.spills.lock().await;
-        !spills.is_empty()
+    fn spilled_before(&self) -> bool {
+        !self.spills.is_empty()
     }
 
     /// MergeSort in mem batches as well as spills into total order with `SortPreservingMergeStream`.
-    async fn sort(&self) -> Result<SendableRecordBatchStream> {
-        let partition = self.partition_id();
+    fn sort(&mut self) -> Result<SendableRecordBatchStream> {
         let batch_size = self.session_config.batch_size();
-        let mut in_mem_batches = self.in_mem_batches.lock().await;
 
-        if self.spilled_before().await {
+        if self.spilled_before() {
             let tracking_metrics = self
                 .metrics_set
-                .new_intermediate_tracking(partition, self.runtime.clone());
+                .new_intermediate_tracking(self.partition_id, &self.runtime.memory_pool);
             let mut streams: Vec<SortedStream> = vec![];
-            if in_mem_batches.len() > 0 {
+            if !self.in_mem_batches.is_empty() {
                 let in_mem_stream = in_mem_partial_sort(
-                    &mut in_mem_batches,
+                    &mut self.in_mem_batches,
                     self.schema.clone(),
                     &self.expr,
                     batch_size,
                     tracking_metrics,
                     self.fetch,
                 )?;
-                let prev_used = self.free_all_memory();
+                let prev_used = self.reservation.free();
                 streams.push(SortedStream::new(in_mem_stream, prev_used));
             }
 
-            let mut spills = self.spills.lock().await;
-
-            for spill in spills.drain(..) {
+            for spill in self.spills.drain(..) {
                 let stream = read_spill_as_stream(spill, self.schema.clone())?;
                 streams.push(SortedStream::new(stream, 0));
             }
             let tracking_metrics = self
                 .metrics_set
-                .new_final_tracking(partition, self.runtime.clone());
+                .new_final_tracking(self.partition_id, &self.runtime.memory_pool);
             Ok(Box::pin(SortPreservingMergeStream::new_from_streams(
                 streams,
                 self.schema.clone(),
@@ -198,12 +201,12 @@ impl ExternalSorter {
                 tracking_metrics,
                 self.session_config.batch_size(),
             )?))
-        } else if in_mem_batches.len() > 0 {
+        } else if !self.in_mem_batches.is_empty() {
             let tracking_metrics = self
                 .metrics_set
-                .new_final_tracking(partition, self.runtime.clone());
+                .new_final_tracking(self.partition_id, &self.runtime.memory_pool);
             let result = in_mem_partial_sort(
-                &mut in_mem_batches,
+                &mut self.in_mem_batches,
                 self.schema.clone(),
                 &self.expr,
                 batch_size,
@@ -211,19 +214,13 @@ impl ExternalSorter {
                 self.fetch,
             );
             // Report to the memory manager we are no longer using memory
-            self.free_all_memory();
+            self.reservation.free();
             result
         } else {
             Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone())))
         }
     }
 
-    fn free_all_memory(&self) -> usize {
-        let used = self.metrics.mem_used().set(0);
-        self.shrink(used);
-        used
-    }
-
     fn used(&self) -> usize {
         self.metrics.mem_used().value()
     }
@@ -235,66 +232,22 @@ impl ExternalSorter {
     fn spill_count(&self) -> usize {
         self.metrics.spill_count().value()
     }
-}
-
-impl Debug for ExternalSorter {
-    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
-        f.debug_struct("ExternalSorter")
-            .field("id", &self.id())
-            .field("memory_used", &self.used())
-            .field("spilled_bytes", &self.spilled_bytes())
-            .field("spill_count", &self.spill_count())
-            .finish()
-    }
-}
 
-impl Drop for ExternalSorter {
-    fn drop(&mut self) {
-        self.runtime.drop_consumer(self.id(), self.used());
-    }
-}
-
-#[async_trait]
-impl MemoryConsumer for ExternalSorter {
-    fn name(&self) -> String {
-        "ExternalSorter".to_owned()
-    }
-
-    fn id(&self) -> &MemoryConsumerId {
-        &self.id
-    }
-
-    fn memory_manager(&self) -> Arc<MemoryManager> {
-        self.runtime.memory_manager.clone()
-    }
-
-    fn type_(&self) -> &ConsumerType {
-        &ConsumerType::Requesting
-    }
-
-    async fn spill(&self) -> Result<usize> {
-        let partition = self.partition_id();
-        let mut in_mem_batches = self.in_mem_batches.lock().await;
+    async fn spill(&mut self) -> Result<usize> {
         // we could always get a chance to free some memory as long as we are holding some
-        if in_mem_batches.len() == 0 {
+        if self.in_mem_batches.is_empty() {
             return Ok(0);
         }
 
-        debug!(
-            "{}[{}] spilling sort data of {} to disk while inserting ({} time(s) so far)",
-            self.name(),
-            self.id(),
-            self.used(),
-            self.spill_count()
-        );
+        debug!("Spilling sort data of ExternalSorter to disk whilst inserting");
 
         let tracking_metrics = self
             .metrics_set
-            .new_intermediate_tracking(partition, self.runtime.clone());
+            .new_intermediate_tracking(self.partition_id, &self.runtime.memory_pool);
 
         let spillfile = self.runtime.disk_manager.create_tmp_file("Sorting")?;
         let stream = in_mem_partial_sort(
-            &mut in_mem_batches,
+            &mut self.in_mem_batches,
             self.schema.clone(),
             &self.expr,
             self.session_config.batch_size(),
@@ -304,15 +257,21 @@ impl MemoryConsumer for ExternalSorter {
 
         spill_partial_sorted_stream(&mut stream?, spillfile.path(), self.schema.clone())
             .await?;
-        let mut spills = self.spills.lock().await;
+        self.reservation.free();
         let used = self.metrics.mem_used().set(0);
         self.metrics.record_spill(used);
-        spills.push(spillfile);
+        self.spills.push(spillfile);
         Ok(used)
     }
+}
 
-    fn mem_used(&self) -> usize {
-        self.metrics.mem_used().value()
+impl Debug for ExternalSorter {
+    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+        f.debug_struct("ExternalSorter")
+            .field("memory_used", &self.used())
+            .field("spilled_bytes", &self.spilled_bytes())
+            .field("spill_count", &self.spill_count())
+            .finish()
     }
 }
 
@@ -528,7 +487,7 @@ impl SortedSizedRecordBatchStream {
         schema: SchemaRef,
         batches: Vec<RecordBatch>,
         sorted_iter: SortedIterator,
-        metrics: MemTrackingMetrics,
+        mut metrics: MemTrackingMetrics,
     ) -> Self {
         let size = batches.iter().map(batch_byte_size).sum::<usize>()
             + sorted_iter.memory_size();
@@ -911,8 +870,8 @@ async fn do_sort(
     );
     let schema = input.schema();
     let tracking_metrics =
-        metrics_set.new_intermediate_tracking(partition_id, context.runtime_env());
-    let sorter = ExternalSorter::new(
+        metrics_set.new_intermediate_tracking(partition_id, context.memory_pool());
+    let mut sorter = ExternalSorter::new(
         partition_id,
         schema.clone(),
         expr,
@@ -921,12 +880,11 @@ async fn do_sort(
         context.runtime_env(),
         fetch,
     );
-    context.runtime_env().register_requester(sorter.id());
     while let Some(batch) = input.next().await {
         let batch = batch?;
         sorter.insert_batch(batch, &tracking_metrics).await?;
     }
-    let result = sorter.sort().await;
+    let result = sorter.sort();
     debug!(
         "End do_sort for partition {} of context session_id {} and task_id {:?}",
         partition_id,
@@ -1005,10 +963,7 @@ mod tests {
         assert_eq!(c7.value(c7.len() - 1), 254,);
 
         assert_eq!(
-            session_ctx
-                .runtime_env()
-                .memory_manager
-                .get_requester_total(),
+            session_ctx.runtime_env().memory_pool.reserved(),
             0,
             "The sort should have returned all memory used back to the memory manager"
         );
@@ -1077,10 +1032,7 @@ mod tests {
         assert_eq!(c7.value(c7.len() - 1), 254,);
 
         assert_eq!(
-            session_ctx
-                .runtime_env()
-                .memory_manager
-                .get_requester_total(),
+            session_ctx.runtime_env().memory_pool.reserved(),
             0,
             "The sort should have returned all memory used back to the memory manager"
         );
@@ -1100,7 +1052,7 @@ mod tests {
             // all the batches we are processing, we expect it to spill.
             (None, true),
             // When we have a limit however, the buffered size of batches should fit in memory
-            // since it is much lover than the total size of the input batch.
+            // since it is much lower than the total size of the input batch.
             (Some(1), false),
         ];
 
@@ -1331,10 +1283,7 @@ mod tests {
         assert_strong_count_converges_to_zero(refs).await;
 
         assert_eq!(
-            session_ctx
-                .runtime_env()
-                .memory_manager
-                .get_requester_total(),
+            session_ctx.runtime_env().memory_pool.reserved(),
             0,
             "The sort should have returned all memory used back to the memory manager"
         );
diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index 212c4c955..f069cc5b0 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -169,7 +169,8 @@ impl ExecutionPlan for SortPreservingMergeExec {
             )));
         }
 
-        let tracking_metrics = MemTrackingMetrics::new(&self.metrics, partition);
+        let tracking_metrics =
+            MemTrackingMetrics::new(&self.metrics, context.memory_pool(), partition);
 
         let input_partitions = self.input.output_partitioning().partition_count();
         debug!(
@@ -342,7 +343,7 @@ impl SortPreservingMergeStream {
         streams: Vec<SortedStream>,
         schema: SchemaRef,
         expressions: &[PhysicalSortExpr],
-        tracking_metrics: MemTrackingMetrics,
+        mut tracking_metrics: MemTrackingMetrics,
         batch_size: usize,
     ) -> Result<Self> {
         let stream_count = streams.len();
@@ -1258,7 +1259,8 @@ mod tests {
         }
 
         let metrics = ExecutionPlanMetricsSet::new();
-        let tracking_metrics = MemTrackingMetrics::new(&metrics, 0);
+        let tracking_metrics =
+            MemTrackingMetrics::new(&metrics, task_ctx.memory_pool(), 0);
 
         let merge_stream = SortPreservingMergeStream::new_from_streams(
             streams,
diff --git a/datafusion/core/tests/memory_limit.rs b/datafusion/core/tests/memory_limit.rs
index 20ad555d6..91d66e884 100644
--- a/datafusion/core/tests/memory_limit.rs
+++ b/datafusion/core/tests/memory_limit.rs
@@ -19,14 +19,13 @@
 
 use std::sync::Arc;
 
-use arrow::record_batch::RecordBatch;
 use datafusion::datasource::MemTable;
 use datafusion::execution::disk_manager::DiskManagerConfig;
 use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
 use datafusion_common::assert_contains;
 
 use datafusion::prelude::{SessionConfig, SessionContext};
-use test_utils::{stagger_batch, AccessLogGenerator};
+use test_utils::AccessLogGenerator;
 
 #[cfg(test)]
 #[ctor::ctor]
@@ -39,6 +38,7 @@ async fn oom_sort() {
     run_limit_test(
         "select * from t order by host DESC",
         "Resources exhausted: Memory Exhausted while Sorting (DiskManager is disabled)",
+        200_000,
     )
     .await
 }
@@ -47,7 +47,8 @@ async fn oom_sort() {
 async fn group_by_none() {
     run_limit_test(
         "select median(image) from t",
-        "Resources exhausted: Cannot spill GroupBy None Accumulators",
+        "Resources exhausted: Failed to allocate additional",
+        20_000,
     )
     .await
 }
@@ -56,7 +57,8 @@ async fn group_by_none() {
 async fn group_by_row_hash() {
     run_limit_test(
         "select count(*) from t GROUP BY response_bytes",
-        "Resources exhausted: Cannot spill GroupBy Hash (Row) AggregationState",
+        "Resources exhausted: Failed to allocate additional",
+        2_000,
     )
     .await
 }
@@ -66,23 +68,21 @@ async fn group_by_hash() {
     run_limit_test(
         // group by dict column
         "select count(*) from t GROUP BY service, host, pod, container",
-        "Resources exhausted: Cannot spill GroupBy Hash Accumulators",
+        "Resources exhausted: Failed to allocate additional",
+        1_000,
     )
     .await
 }
 
 /// 50 byte memory limit
-const MEMORY_LIMIT_BYTES: usize = 50;
 const MEMORY_FRACTION: f64 = 0.95;
 
 /// runs the specified query against 1000 rows with a 50
 /// byte memory limit and no disk manager enabled.
-async fn run_limit_test(query: &str, expected_error: &str) {
-    let generator = AccessLogGenerator::new().with_row_limit(Some(1000));
-
-    let batches: Vec<RecordBatch> = generator
-        // split up into more than one batch, as the size limit in sort is not enforced until the second batch
-        .flat_map(stagger_batch)
+async fn run_limit_test(query: &str, expected_error: &str, memory_limit: usize) {
+    let batches: Vec<_> = AccessLogGenerator::new()
+        .with_row_limit(1000)
+        .with_max_batch_size(50)
         .collect();
 
     let table = MemTable::try_new(batches[0].schema(), vec![batches]).unwrap();
@@ -91,7 +91,7 @@ async fn run_limit_test(query: &str, expected_error: &str) {
         // do not allow spilling
         .with_disk_manager(DiskManagerConfig::Disabled)
         // Only allow 50 bytes
-        .with_memory_limit(MEMORY_LIMIT_BYTES, MEMORY_FRACTION);
+        .with_memory_limit(memory_limit, MEMORY_FRACTION);
 
     let runtime = RuntimeEnv::new(rt_config).unwrap();
 
diff --git a/datafusion/core/tests/order_spill_fuzz.rs b/datafusion/core/tests/order_spill_fuzz.rs
index cc700d5d2..923b44e26 100644
--- a/datafusion/core/tests/order_spill_fuzz.rs
+++ b/datafusion/core/tests/order_spill_fuzz.rs
@@ -22,7 +22,7 @@ use arrow::{
     compute::SortOptions,
     record_batch::RecordBatch,
 };
-use datafusion::execution::memory_manager::MemoryManagerConfig;
+use datafusion::execution::memory_pool::GreedyMemoryPool;
 use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
 use datafusion::physical_plan::expressions::{col, PhysicalSortExpr};
 use datafusion::physical_plan::memory::MemoryExec;
@@ -31,18 +31,18 @@ use datafusion::physical_plan::{collect, ExecutionPlan};
 use datafusion::prelude::{SessionConfig, SessionContext};
 use rand::Rng;
 use std::sync::Arc;
-use test_utils::{batches_to_vec, partitions_to_sorted_vec, stagger_batch_with_seed};
+use test_utils::{batches_to_vec, partitions_to_sorted_vec};
 
 #[tokio::test]
 #[cfg_attr(tarpaulin, ignore)]
 async fn test_sort_1k_mem() {
-    run_sort(1024, vec![(5, false), (2000, true), (1000000, true)]).await
+    run_sort(10240, vec![(5, false), (20000, true), (1000000, true)]).await
 }
 
 #[tokio::test]
 #[cfg_attr(tarpaulin, ignore)]
 async fn test_sort_100k_mem() {
-    run_sort(102400, vec![(5, false), (2000, false), (1000000, true)]).await
+    run_sort(102400, vec![(5, false), (20000, false), (1000000, true)]).await
 }
 
 #[tokio::test]
@@ -76,9 +76,8 @@ async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) {
         let exec = MemoryExec::try_new(&input, schema, None).unwrap();
         let sort = Arc::new(SortExec::try_new(sort, Arc::new(exec), None).unwrap());
 
-        let runtime_config = RuntimeConfig::new().with_memory_manager(
-            MemoryManagerConfig::try_new_limit(pool_size, 1.0).unwrap(),
-        );
+        let runtime_config = RuntimeConfig::new()
+            .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)));
         let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap());
         let session_ctx = SessionContext::with_config_rt(SessionConfig::new(), runtime);
 
@@ -95,12 +94,9 @@ async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) {
         }
 
         assert_eq!(
-            session_ctx
-                .runtime_env()
-                .memory_manager
-                .get_requester_total(),
+            session_ctx.runtime_env().memory_pool.reserved(),
             0,
-            "The sort should have returned all memory used back to the memory manager"
+            "The sort should have returned all memory used back to the memory pool"
         );
         assert_eq!(expected, actual, "failure in @ pool_size {}", pool_size);
     }
@@ -110,12 +106,23 @@ async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) {
 /// with randomized i32 content
 fn make_staggered_batches(len: usize) -> Vec<RecordBatch> {
     let mut rng = rand::thread_rng();
-    let mut input: Vec<i32> = vec![0; len];
-    rng.fill(&mut input[..]);
-    let input = Int32Array::from_iter_values(input.into_iter());
-
-    // split into several record batches
-    let batch =
-        RecordBatch::try_from_iter(vec![("x", Arc::new(input) as ArrayRef)]).unwrap();
-    stagger_batch_with_seed(batch, 42)
+    let max_batch = 1024;
+
+    let mut batches = vec![];
+    let mut remaining = len;
+    while remaining != 0 {
+        let to_read = rng.gen_range(0..=remaining.min(max_batch));
+        remaining -= to_read;
+
+        batches.push(
+            RecordBatch::try_from_iter(vec![(
+                "x",
+                Arc::new(Int32Array::from_iter_values(
+                    std::iter::from_fn(|| Some(rng.gen())).take(to_read),
+                )) as ArrayRef,
+            )])
+            .unwrap(),
+        )
+    }
+    batches
 }
diff --git a/datafusion/core/tests/parquet/filter_pushdown.rs b/datafusion/core/tests/parquet/filter_pushdown.rs
index 999becafd..fc74d7ded 100644
--- a/datafusion/core/tests/parquet/filter_pushdown.rs
+++ b/datafusion/core/tests/parquet/filter_pushdown.rs
@@ -57,7 +57,7 @@ async fn single_file() {
 
     let tempdir = TempDir::new().unwrap();
 
-    let generator = AccessLogGenerator::new().with_row_limit(Some(NUM_ROWS));
+    let generator = AccessLogGenerator::new().with_row_limit(NUM_ROWS);
 
     // default properties
     let props = WriterProperties::builder().build();
@@ -236,7 +236,7 @@ async fn single_file() {
 async fn single_file_small_data_pages() {
     let tempdir = TempDir::new().unwrap();
 
-    let generator = AccessLogGenerator::new().with_row_limit(Some(NUM_ROWS));
+    let generator = AccessLogGenerator::new().with_row_limit(NUM_ROWS);
 
     // set the max page rows with arbitrary sizes 8311 to increase
     // effectiveness of page filtering
diff --git a/datafusion/core/tests/provider_filter_pushdown.rs b/datafusion/core/tests/provider_filter_pushdown.rs
index 7276820f6..13160fd52 100644
--- a/datafusion/core/tests/provider_filter_pushdown.rs
+++ b/datafusion/core/tests/provider_filter_pushdown.rs
@@ -90,10 +90,11 @@ impl ExecutionPlan for CustomPlan {
     fn execute(
         &self,
         partition: usize,
-        _context: Arc<TaskContext>,
+        context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
         let metrics = ExecutionPlanMetricsSet::new();
-        let tracking_metrics = MemTrackingMetrics::new(&metrics, partition);
+        let tracking_metrics =
+            MemTrackingMetrics::new(&metrics, context.memory_pool(), partition);
         Ok(Box::pin(SizedRecordBatchStream::new(
             self.schema(),
             self.batches.clone(),
diff --git a/test-utils/src/data_gen.rs b/test-utils/src/data_gen.rs
index c82d56ef2..19db65400 100644
--- a/test-utils/src/data_gen.rs
+++ b/test-utils/src/data_gen.rs
@@ -78,6 +78,13 @@ impl BatchBuilder {
         ]))
     }
 
+    fn is_finished(&self) -> bool {
+        self.row_limit
+            .as_ref()
+            .map(|x| *x <= self.row_count)
+            .unwrap_or_default()
+    }
+
     fn append(&mut self, rng: &mut StdRng, host: &str, service: &str) {
         let num_pods = rng.gen_range(1..15);
         let pods = generate_sorted_strings(rng, num_pods, 30..40);
@@ -91,6 +98,10 @@ impl BatchBuilder {
 
                 let num_entries = rng.gen_range(1024..8192);
                 for i in 0..num_entries {
+                    if self.is_finished() {
+                        return;
+                    }
+
                     let time = i as i64 * 1024;
                     self.append_row(rng, host, &pod, service, &container, &image, time);
                 }
@@ -109,12 +120,6 @@ impl BatchBuilder {
         image: &str,
         time: i64,
     ) {
-        // skip if over limit
-        if let Some(limit) = self.row_limit {
-            if self.row_count >= limit {
-                return;
-            }
-        }
         self.row_count += 1;
 
         let methods = &["GET", "PUT", "POST", "HEAD", "PATCH", "DELETE"];
@@ -237,7 +242,9 @@ pub struct AccessLogGenerator {
     rng: StdRng,
     host_idx: usize,
     /// optional  number of rows produced
-    row_limit: Option<usize>,
+    row_limit: usize,
+    /// maximum rows per batch
+    max_batch_size: usize,
     /// How many rows have been returned so far
     row_count: usize,
 }
@@ -259,7 +266,8 @@ impl AccessLogGenerator {
             schema: BatchBuilder::schema(),
             host_idx: 0,
             rng: StdRng::from_seed(seed),
-            row_limit: None,
+            row_limit: usize::MAX,
+            max_batch_size: usize::MAX,
             row_count: 0,
         }
     }
@@ -269,8 +277,14 @@ impl AccessLogGenerator {
         self.schema.clone()
     }
 
+    /// Limit the maximum batch size
+    pub fn with_max_batch_size(mut self, batch_size: usize) -> Self {
+        self.max_batch_size = batch_size;
+        self
+    }
+
     /// Return up to row_limit rows;
-    pub fn with_row_limit(mut self, row_limit: Option<usize>) -> Self {
+    pub fn with_row_limit(mut self, row_limit: usize) -> Self {
         self.row_limit = row_limit;
         self
     }
@@ -280,15 +294,13 @@ impl Iterator for AccessLogGenerator {
     type Item = RecordBatch;
 
     fn next(&mut self) -> Option<Self::Item> {
-        // if we have a limit and have passed it, stop generating
-        if let Some(limit) = self.row_limit {
-            if self.row_count >= limit {
-                return None;
-            }
+        if self.row_count == self.row_limit {
+            return None;
         }
 
-        let mut builder = BatchBuilder::default()
-            .with_row_limit(self.row_limit.map(|limit| limit - self.row_count));
+        let mut builder = BatchBuilder::default().with_row_limit(Some(
+            self.max_batch_size.min(self.row_limit - self.row_count),
+        ));
 
         let host = format!(
             "i-{:016x}.ec2.internal",
@@ -300,19 +312,14 @@ impl Iterator for AccessLogGenerator {
             if self.rng.gen_bool(0.5) {
                 continue;
             }
+            if builder.is_finished() {
+                break;
+            }
             builder.append(&mut self.rng, &host, service);
         }
 
         let batch = builder.finish(Arc::clone(&self.schema));
 
-        // limit batch if needed to stay under row limit
-        let batch = if let Some(limit) = self.row_limit {
-            let num_rows = limit - self.row_count;
-            batch.slice(0, num_rows)
-        } else {
-            batch
-        };
-
         self.row_count += batch.num_rows();
         Some(batch)
     }