You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/01/09 14:09:41 UTC

[GitHub] [arrow-datafusion] alamb commented on a change in pull request #1526: A simplified memory manager for query execution

alamb commented on a change in pull request #1526:
URL: https://github.com/apache/arrow-datafusion/pull/1526#discussion_r780779376



##########
File path: ballista/rust/executor/src/collect.rs
##########
@@ -75,11 +76,12 @@ impl ExecutionPlan for CollectExec {
     async fn execute(
         &self,
         partition: usize,
-    ) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
+        runtime: Arc<RuntimeEnv>,

Review comment:
       I think threading a parameter like this  all the way through makes sense šŸ‘ 

##########
File path: datafusion/src/execution/runtime_env.rs
##########
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution runtime environment that tracks memory, disk and various configurations
+//! that are used during physical plan execution.
+
+use crate::error::Result;
+use crate::execution::disk_manager::DiskManager;
+use crate::execution::memory_manager::{MemoryConsumer, MemoryConsumerId, MemoryManager};
+use std::fmt::{Debug, Formatter};
+use std::sync::{Arc, Mutex};
+
+#[derive(Clone)]
+/// Execution runtime environment
+pub struct RuntimeEnv {
+    /// Runtime configuration
+    pub config: RuntimeConfig,
+    /// Runtime memory management
+    pub memory_manager: Arc<MemoryManager>,
+    /// Manage temporary files during query execution
+    pub disk_manager: Arc<DiskManager>,
+    /// If runtime env has initialized
+    initialized: Arc<Mutex<bool>>,
+}
+
+impl Debug for RuntimeEnv {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(f, "RuntimeEnv")
+    }
+}
+
+impl RuntimeEnv {
+    /// Create env based on configuration
+    #[allow(clippy::mutex_atomic)]
+    pub fn new(config: RuntimeConfig) -> Result<Self> {
+        let memory_manager = Arc::new(MemoryManager::new(
+            (config.max_memory as f64 * config.memory_fraction) as usize,
+        ));
+        let disk_manager = Arc::new(DiskManager::new(&config.local_dirs)?);
+        Ok(Self {
+            config,
+            memory_manager,
+            disk_manager,
+            initialized: Arc::new(Mutex::new(false)),
+        })
+    }
+
+    /// Get execution batch size based on config
+    pub fn batch_size(&self) -> usize {
+        self.config.batch_size
+    }
+
+    /// Register the consumer to get it tracked
+    pub fn register_consumer(&self, memory_consumer: Arc<dyn MemoryConsumer>) {
+        {
+            let mut initialized = self.initialized.lock().unwrap();
+            if !*initialized {
+                self.memory_manager.initialize();
+                *initialized = true;
+            }
+        }
+        self.memory_manager.register_consumer(memory_consumer);
+    }
+
+    /// Drop the consumer from get tracked
+    pub fn drop_consumer(&self, id: &MemoryConsumerId) {
+        self.memory_manager.drop_consumer(id)
+    }
+}
+
+impl Default for RuntimeEnv {
+    fn default() -> Self {
+        RuntimeEnv::new(RuntimeConfig::new()).unwrap()
+    }
+}
+
+#[derive(Clone)]
+/// Execution runtime configuration
+pub struct RuntimeConfig {
+    /// Default batch size while creating new batches, it's especially useful
+    /// for buffer-in-memory batches since creating tiny batches would results
+    /// in too much metadata memory consumption.
+    pub batch_size: usize,
+    /// Max execution memory allowed for DataFusion.

Review comment:
       ```suggestion
       /// Max execution memory allowed for DataFusion. Defaults
       /// to `usize::MAX`
   ```

##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -0,0 +1,320 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Manages all available memory during query execution
+
+use crate::error::Result;
+use async_trait::async_trait;
+use hashbrown::HashMap;
+use log::info;
+use std::fmt;
+use std::fmt::{Debug, Display, Formatter};
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Mutex};
+use std::time::Duration;
+use tokio::task;
+use tokio::task::JoinHandle;
+
+static mut CONSUMER_ID: AtomicUsize = AtomicUsize::new(0);
+
+fn next_id() -> usize {
+    unsafe { CONSUMER_ID.fetch_add(1, Ordering::SeqCst) }
+}
+
+/// Type of the memory consumer
+pub enum ConsumerType {
+    /// consumers that can grow or shrink its memory usage during execution
+    /// such as spillable sorter, spillable hashmap, etc.
+    Controlling,
+    /// consumers that are not spillable, counting in for only tracking purpose.
+    Tracking,
+}
+
+#[derive(Clone, Debug, Hash, Eq, PartialEq)]
+/// Id that uniquely identifies a Memory Consumer
+pub struct MemoryConsumerId {
+    /// partition the consumer belongs to
+    pub partition_id: usize,
+    /// unique id
+    pub id: usize,
+}
+
+impl MemoryConsumerId {
+    /// Auto incremented new Id
+    pub fn new(partition_id: usize) -> Self {
+        let id = next_id();
+        Self { partition_id, id }
+    }
+}
+
+impl Display for MemoryConsumerId {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        write!(f, "{}:{}", self.partition_id, self.id)
+    }
+}
+
+#[async_trait]
+/// A memory consumer that either takes up memory (of type `ConsumerType::Tracking`)
+/// or grows/shrinks memory usage based on available memory (of type `ConsumerType::Controlling`).
+pub trait MemoryConsumer: Send + Sync + Debug {
+    /// Display name of the consumer
+    fn name(&self) -> String;
+
+    /// Unique id of the consumer
+    fn id(&self) -> &MemoryConsumerId;
+
+    /// Ptr to MemoryManager
+    fn memory_manager(&self) -> Arc<MemoryManager>;
+
+    /// Partition that the consumer belongs to
+    fn partition_id(&self) -> usize {
+        self.id().partition_id
+    }
+
+    /// Type of the consumer
+    fn type_(&self) -> &ConsumerType;
+
+    /// Grow memory by `required` to buffer more data in memory,
+    /// this may trigger spill before grow when the memory threshold is
+    /// reached for this consumer.
+    async fn try_grow(&self, required: usize) -> Result<()> {
+        let current_usage = self.mem_used();
+        let can_grow = self
+            .memory_manager()
+            .try_grow(required, current_usage, self.id())
+            .await;
+        if !can_grow {
+            info!(
+                "Failed to grow memory of {} from {}, spilling...",
+                human_readable_size(required),
+                self.id()
+            );
+            self.spill().await?;
+        }
+        Ok(())
+    }
+
+    /// Spill in-memory buffers to disk, free memory
+    async fn spill(&self) -> Result<()>;
+
+    /// Current memory used by this consumer
+    fn mem_used(&self) -> usize;
+
+    /// Current status of the consumer
+    fn str_repr(&self) -> String {
+        let mem = self.mem_used();
+        format!(
+            "{}[{}]: {}",
+            self.name(),
+            self.id(),
+            human_readable_size(mem)
+        )
+    }
+}
+
+/*
+The memory management architecture is the following:
+
+1. User designates max execution memory by setting RuntimeConfig.max_memory and RuntimeConfig.memory_fraction (float64 between 0..1).
+   The actual max memory DataFusion could use `pool_size =  max_memory * memory_fraction`.
+2. The entities that take up memory during its execution are called 'Memory Consumers'. Operators or others are encouraged to
+   register themselves to the memory manager and report its usage through `mem_used()`.
+3. There are two kinds of consumers:
+   - 'Controlling' consumers that would acquire memory during its execution and release memory through `spill` if no more memory is available.
+   - 'Tracking' consumers that exist for reporting purposes to provide a more accurate memory usage estimation for memory consumers.
+4. Controlling and tracking consumers share the pool. Each controlling consumer could acquire a maximum of
+   (pool_size - all_tracking_used) / active_num_controlling_consumers.
+
+            Memory Space for the DataFusion Lib / Process of `pool_size`
+   ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€zā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
+   ā”‚                                              z                             ā”‚
+   ā”‚                                              z                             ā”‚
+   ā”‚               Controlling                    z          Tracking           ā”‚
+   ā”‚            Memory Consumers                  z       Memory Consumers      ā”‚
+   ā”‚                                              z                             ā”‚
+   ā”‚                                              z                             ā”‚
+   ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€zā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜
+*/
+
+/// Manage memory usage during physical plan execution
+pub struct MemoryManager {
+    consumers: Arc<Mutex<HashMap<MemoryConsumerId, Arc<dyn MemoryConsumer>>>>,
+    trackers: Arc<Mutex<HashMap<MemoryConsumerId, Arc<dyn MemoryConsumer>>>>,
+    trackers_total_usage: AtomicUsize,
+    pool_size: usize,
+    join_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
+}
+
+impl MemoryManager {
+    /// Create new memory manager based on max available pool_size
+    pub fn new(pool_size: usize) -> Self {
+        info!(
+            "Creating memory manager with initial size {}",
+            human_readable_size(pool_size)
+        );
+        Self {
+            consumers: Arc::new(Mutex::new(HashMap::new())),
+            trackers: Arc::new(Mutex::new(HashMap::new())),
+            trackers_total_usage: AtomicUsize::new(0),
+            pool_size,
+            join_handle: Arc::new(Mutex::new(None)),
+        }
+    }
+
+    fn update_tracker_total(self: &Arc<Self>) {
+        let trackers = self.trackers.lock().unwrap();
+        if trackers.len() > 0 {
+            let sum = trackers.values().fold(0usize, |acc, y| acc + y.mem_used());
+            drop(trackers);
+            self.trackers_total_usage.store(sum, Ordering::SeqCst);
+        }
+    }
+
+    /// Initialize
+    pub(crate) fn initialize(self: &Arc<Self>) {
+        let manager = self.clone();
+        let handle = task::spawn(async move {

Review comment:
       I think the idea of periodically polling tracking consumers is reasonable.
   
   I am a little worried about a task that polls based on some clock interval, however -- it is likely that the frequency will be too fast or two slow.
   
   What about updating tracking consumers every call to `try_grow?` or query to the memory manager for total memory used?

##########
File path: datafusion/src/execution/disk_manager.rs
##########
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Manages files generated during query execution, files are
+//! hashed among the directories listed in RuntimeConfig::local_dirs.
+
+use crate::error::{DataFusionError, Result};
+use log::info;
+use std::collections::hash_map::DefaultHasher;
+use std::fs;
+use std::fs::File;
+use std::hash::{Hash, Hasher};
+use std::path::{Path, PathBuf};
+use uuid::Uuid;

Review comment:
       Given DataFusion already uses `rand` I think we could avoid the `uuid` dependency by using that directly
   
   Something like:
   
   
   ```
   /// Return 10 character random string
   pub fn dir_name() -> String {
       thread_rng()
           .sample_iter(&Alphanumeric)
           .take(10)
           .map(char::from)
           .collect()
   }
   ```
   
   From https://github.com/influxdata/influxdb_iox/blob/3cda6b6c0f5c72b710fa73aec581ec079700fd11/influxdb_iox/tests/end_to_end_cases/scenario.rs#L286-L293
   

##########
File path: datafusion/src/execution/disk_manager.rs
##########
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Manages files generated during query execution, files are
+//! hashed among the directories listed in RuntimeConfig::local_dirs.
+
+use crate::error::{DataFusionError, Result};
+use log::info;
+use std::collections::hash_map::DefaultHasher;
+use std::fs;
+use std::fs::File;
+use std::hash::{Hash, Hasher};
+use std::path::{Path, PathBuf};
+use uuid::Uuid;
+
+/// Manages files generated during query execution, e.g. spill files generated
+/// while processing dataset larger than available memory.
+pub struct DiskManager {
+    local_dirs: Vec<String>,
+}
+
+impl DiskManager {
+    /// Create local dirs inside user provided dirs through conf
+    pub fn new(conf_dirs: &[String]) -> Result<Self> {

Review comment:
       Since this PR adds the `tempdir` dependency to datafusion, I think you could probably use `TempDir::new` and `TempDir::new_in` here rather than managing `String`s directly
   
   https://docs.rs/tempdir/latest/tempdir/struct.TempDir.html#method.new_in
   
   That would avoid the need for directory creation retry as well as it would handle cleaning up the files after execution
   
   I think we could switch to using `TempDirs` in a following PR however, as the API here is solid

##########
File path: datafusion/src/error.rs
##########
@@ -129,6 +132,9 @@ impl Display for DataFusionError {
             DataFusionError::Execution(ref desc) => {
                 write!(f, "Execution error: {}", desc)
             }
+            DataFusionError::ResourcesExhausted(ref desc) => {

Review comment:
       šŸ‘ 

##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -0,0 +1,320 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Manages all available memory during query execution
+
+use crate::error::Result;
+use async_trait::async_trait;
+use hashbrown::HashMap;
+use log::info;
+use std::fmt;
+use std::fmt::{Debug, Display, Formatter};
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Mutex};
+use std::time::Duration;
+use tokio::task;
+use tokio::task::JoinHandle;
+
+static mut CONSUMER_ID: AtomicUsize = AtomicUsize::new(0);
+
+fn next_id() -> usize {
+    unsafe { CONSUMER_ID.fetch_add(1, Ordering::SeqCst) }
+}
+
+/// Type of the memory consumer
+pub enum ConsumerType {
+    /// consumers that can grow or shrink its memory usage during execution
+    /// such as spillable sorter, spillable hashmap, etc.
+    Controlling,
+    /// consumers that are not spillable, counting in for only tracking purpose.
+    Tracking,
+}
+
+#[derive(Clone, Debug, Hash, Eq, PartialEq)]
+/// Id that uniquely identifies a Memory Consumer
+pub struct MemoryConsumerId {
+    /// partition the consumer belongs to
+    pub partition_id: usize,
+    /// unique id
+    pub id: usize,
+}
+
+impl MemoryConsumerId {
+    /// Auto incremented new Id
+    pub fn new(partition_id: usize) -> Self {
+        let id = next_id();
+        Self { partition_id, id }
+    }
+}
+
+impl Display for MemoryConsumerId {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        write!(f, "{}:{}", self.partition_id, self.id)
+    }
+}
+
+#[async_trait]
+/// A memory consumer that either takes up memory (of type `ConsumerType::Tracking`)
+/// or grows/shrinks memory usage based on available memory (of type `ConsumerType::Controlling`).
+pub trait MemoryConsumer: Send + Sync + Debug {
+    /// Display name of the consumer
+    fn name(&self) -> String;
+
+    /// Unique id of the consumer
+    fn id(&self) -> &MemoryConsumerId;
+
+    /// Ptr to MemoryManager
+    fn memory_manager(&self) -> Arc<MemoryManager>;
+
+    /// Partition that the consumer belongs to
+    fn partition_id(&self) -> usize {
+        self.id().partition_id
+    }
+
+    /// Type of the consumer
+    fn type_(&self) -> &ConsumerType;
+
+    /// Grow memory by `required` to buffer more data in memory,
+    /// this may trigger spill before grow when the memory threshold is
+    /// reached for this consumer.
+    async fn try_grow(&self, required: usize) -> Result<()> {

Review comment:
       I am a little confused about why  `try_grow` and `spill` are part of this trait
   
   The main use of this  trait seems to be for the `MemoryManager` to interact with `ExecutionPlans` (to ask them for their memory usage, etc), but `try_grow` and `spill` call back to the memory manager
   
   By calling back to the `MemoryManager`, I think this means there is a Ref count cycle between `ExecutionPlan` and `MemoryManager` (they both have `Arc`s pointing at each other, so the ref counts will never go to zero)
   
   Looking at how `try_grow` and `spill` are used in `ExternalSort`, those methods aren't called from the `MemoryManager` at all.
   
   Thus, I suggest removing `try_grow`, `memory_manager()` and `spill` method from this trait (and put them directly into `ExternalSort`)

##########
File path: datafusion/src/execution/runtime_env.rs
##########
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution runtime environment that tracks memory, disk and various configurations
+//! that are used during physical plan execution.
+
+use crate::error::Result;
+use crate::execution::disk_manager::DiskManager;
+use crate::execution::memory_manager::{MemoryConsumer, MemoryConsumerId, MemoryManager};
+use std::fmt::{Debug, Formatter};
+use std::sync::{Arc, Mutex};
+
+#[derive(Clone)]
+/// Execution runtime environment
+pub struct RuntimeEnv {
+    /// Runtime configuration
+    pub config: RuntimeConfig,
+    /// Runtime memory management
+    pub memory_manager: Arc<MemoryManager>,
+    /// Manage temporary files during query execution
+    pub disk_manager: Arc<DiskManager>,
+    /// If runtime env has initialized
+    initialized: Arc<Mutex<bool>>,
+}
+
+impl Debug for RuntimeEnv {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(f, "RuntimeEnv")
+    }
+}
+
+impl RuntimeEnv {
+    /// Create env based on configuration
+    #[allow(clippy::mutex_atomic)]
+    pub fn new(config: RuntimeConfig) -> Result<Self> {
+        let memory_manager = Arc::new(MemoryManager::new(
+            (config.max_memory as f64 * config.memory_fraction) as usize,
+        ));
+        let disk_manager = Arc::new(DiskManager::new(&config.local_dirs)?);
+        Ok(Self {
+            config,
+            memory_manager,
+            disk_manager,
+            initialized: Arc::new(Mutex::new(false)),
+        })
+    }
+
+    /// Get execution batch size based on config
+    pub fn batch_size(&self) -> usize {
+        self.config.batch_size
+    }
+
+    /// Register the consumer to get it tracked
+    pub fn register_consumer(&self, memory_consumer: Arc<dyn MemoryConsumer>) {
+        {
+            let mut initialized = self.initialized.lock().unwrap();
+            if !*initialized {
+                self.memory_manager.initialize();
+                *initialized = true;
+            }
+        }
+        self.memory_manager.register_consumer(memory_consumer);
+    }
+
+    /// Drop the consumer from get tracked
+    pub fn drop_consumer(&self, id: &MemoryConsumerId) {
+        self.memory_manager.drop_consumer(id)
+    }
+}
+
+impl Default for RuntimeEnv {
+    fn default() -> Self {
+        RuntimeEnv::new(RuntimeConfig::new()).unwrap()
+    }
+}
+
+#[derive(Clone)]
+/// Execution runtime configuration
+pub struct RuntimeConfig {
+    /// Default batch size while creating new batches, it's especially useful
+    /// for buffer-in-memory batches since creating tiny batches would results
+    /// in too much metadata memory consumption.
+    pub batch_size: usize,
+    /// Max execution memory allowed for DataFusion.
+    pub max_memory: usize,
+    /// The fraction of total memory used for execution.
+    /// The purpose of this config is to set aside memory for untracked data structures,
+    /// and imprecise size estimation during memory acquisition.
+    pub memory_fraction: f64,
+    /// Local dirs to store temporary files during execution.
+    pub local_dirs: Vec<String>,
+}
+
+impl RuntimeConfig {
+    /// New with default values
+    pub fn new() -> Self {
+        Default::default()
+    }
+
+    /// Customize batch size
+    pub fn with_batch_size(mut self, n: usize) -> Self {
+        // batch size must be greater than zero
+        assert!(n > 0);
+        self.batch_size = n;
+        self
+    }
+
+    /// Customize exec size
+    pub fn with_max_execution_memory(mut self, max_memory: usize) -> Self {
+        assert!(max_memory > 0);
+        self.max_memory = max_memory;
+        self
+    }
+
+    /// Customize exec memory fraction
+    pub fn with_memory_fraction(mut self, fraction: f64) -> Self {
+        assert!(fraction > 0f64 && fraction < 1f64);
+        self.memory_fraction = fraction;
+        self
+    }
+
+    /// Customize exec size
+    pub fn with_local_dirs(mut self, local_dirs: Vec<String>) -> Self {
+        assert!(!local_dirs.is_empty());
+        self.local_dirs = local_dirs;
+        self
+    }
+}
+
+impl Default for RuntimeConfig {
+    fn default() -> Self {
+        let tmp_dir = tempfile::tempdir().unwrap();
+        let path = tmp_dir.path().to_str().unwrap().to_string();
+        std::mem::forget(tmp_dir);
+
+        Self {
+            batch_size: 8192,
+            max_memory: usize::MAX,

Review comment:
       I agree with this default šŸ‘ 
   
   ```suggestion
               // Effectively "no limit"
               max_memory: usize::MAX,
   ```

##########
File path: datafusion/src/execution/disk_manager.rs
##########
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Manages files generated during query execution, files are
+//! hashed among the directories listed in RuntimeConfig::local_dirs.
+
+use crate::error::{DataFusionError, Result};
+use log::info;
+use std::collections::hash_map::DefaultHasher;
+use std::fs;
+use std::fs::File;
+use std::hash::{Hash, Hasher};
+use std::path::{Path, PathBuf};
+use uuid::Uuid;
+
+/// Manages files generated during query execution, e.g. spill files generated
+/// while processing dataset larger than available memory.
+pub struct DiskManager {
+    local_dirs: Vec<String>,
+}
+
+impl DiskManager {
+    /// Create local dirs inside user provided dirs through conf
+    pub fn new(conf_dirs: &[String]) -> Result<Self> {
+        let local_dirs = create_local_dirs(conf_dirs)?;
+        info!(
+            "Created local dirs {:?} as DataFusion working directory",
+            local_dirs
+        );
+        Ok(Self { local_dirs })
+    }
+
+    /// Create a file in conf dirs in randomized manner and return the file path
+    pub fn create_tmp_file(&self) -> Result<String> {
+        create_tmp_file(&self.local_dirs)
+    }
+
+    #[allow(dead_code)]
+    fn cleanup_resource(&mut self) -> Result<()> {

Review comment:
       This seems to be disconnected

##########
File path: datafusion/src/execution/runtime_env.rs
##########
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution runtime environment that tracks memory, disk and various configurations
+//! that are used during physical plan execution.
+
+use crate::error::Result;
+use crate::execution::disk_manager::DiskManager;
+use crate::execution::memory_manager::{MemoryConsumer, MemoryConsumerId, MemoryManager};
+use std::fmt::{Debug, Formatter};
+use std::sync::{Arc, Mutex};
+
+#[derive(Clone)]
+/// Execution runtime environment
+pub struct RuntimeEnv {
+    /// Runtime configuration
+    pub config: RuntimeConfig,
+    /// Runtime memory management
+    pub memory_manager: Arc<MemoryManager>,
+    /// Manage temporary files during query execution
+    pub disk_manager: Arc<DiskManager>,
+    /// If runtime env has initialized
+    initialized: Arc<Mutex<bool>>,
+}
+
+impl Debug for RuntimeEnv {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(f, "RuntimeEnv")
+    }
+}
+
+impl RuntimeEnv {
+    /// Create env based on configuration
+    #[allow(clippy::mutex_atomic)]
+    pub fn new(config: RuntimeConfig) -> Result<Self> {
+        let memory_manager = Arc::new(MemoryManager::new(
+            (config.max_memory as f64 * config.memory_fraction) as usize,
+        ));
+        let disk_manager = Arc::new(DiskManager::new(&config.local_dirs)?);
+        Ok(Self {
+            config,
+            memory_manager,
+            disk_manager,
+            initialized: Arc::new(Mutex::new(false)),
+        })
+    }
+
+    /// Get execution batch size based on config
+    pub fn batch_size(&self) -> usize {
+        self.config.batch_size
+    }
+
+    /// Register the consumer to get it tracked
+    pub fn register_consumer(&self, memory_consumer: Arc<dyn MemoryConsumer>) {
+        {
+            let mut initialized = self.initialized.lock().unwrap();
+            if !*initialized {
+                self.memory_manager.initialize();
+                *initialized = true;
+            }
+        }
+        self.memory_manager.register_consumer(memory_consumer);
+    }
+
+    /// Drop the consumer from get tracked
+    pub fn drop_consumer(&self, id: &MemoryConsumerId) {
+        self.memory_manager.drop_consumer(id)
+    }
+}
+
+impl Default for RuntimeEnv {
+    fn default() -> Self {
+        RuntimeEnv::new(RuntimeConfig::new()).unwrap()
+    }
+}
+
+#[derive(Clone)]
+/// Execution runtime configuration
+pub struct RuntimeConfig {
+    /// Default batch size while creating new batches, it's especially useful
+    /// for buffer-in-memory batches since creating tiny batches would results
+    /// in too much metadata memory consumption.
+    pub batch_size: usize,
+    /// Max execution memory allowed for DataFusion.
+    pub max_memory: usize,
+    /// The fraction of total memory used for execution.
+    /// The purpose of this config is to set aside memory for untracked data structures,
+    /// and imprecise size estimation during memory acquisition.

Review comment:
       ```suggestion
       /// and imprecise size estimation during memory acquisition.
       /// Defaults to 0.7
   ```

##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -0,0 +1,320 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Manages all available memory during query execution
+
+use crate::error::Result;
+use async_trait::async_trait;
+use hashbrown::HashMap;
+use log::info;
+use std::fmt;
+use std::fmt::{Debug, Display, Formatter};
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Mutex};
+use std::time::Duration;
+use tokio::task;
+use tokio::task::JoinHandle;
+
+static mut CONSUMER_ID: AtomicUsize = AtomicUsize::new(0);
+
+fn next_id() -> usize {
+    unsafe { CONSUMER_ID.fetch_add(1, Ordering::SeqCst) }
+}
+
+/// Type of the memory consumer
+pub enum ConsumerType {
+    /// consumers that can grow or shrink its memory usage during execution
+    /// such as spillable sorter, spillable hashmap, etc.
+    Controlling,
+    /// consumers that are not spillable, counting in for only tracking purpose.
+    Tracking,
+}
+
+#[derive(Clone, Debug, Hash, Eq, PartialEq)]
+/// Id that uniquely identifies a Memory Consumer
+pub struct MemoryConsumerId {
+    /// partition the consumer belongs to
+    pub partition_id: usize,
+    /// unique id
+    pub id: usize,
+}
+
+impl MemoryConsumerId {
+    /// Auto incremented new Id
+    pub fn new(partition_id: usize) -> Self {
+        let id = next_id();
+        Self { partition_id, id }
+    }
+}
+
+impl Display for MemoryConsumerId {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        write!(f, "{}:{}", self.partition_id, self.id)
+    }
+}
+
+#[async_trait]
+/// A memory consumer that either takes up memory (of type `ConsumerType::Tracking`)
+/// or grows/shrinks memory usage based on available memory (of type `ConsumerType::Controlling`).
+pub trait MemoryConsumer: Send + Sync + Debug {
+    /// Display name of the consumer
+    fn name(&self) -> String;
+
+    /// Unique id of the consumer
+    fn id(&self) -> &MemoryConsumerId;
+
+    /// Ptr to MemoryManager
+    fn memory_manager(&self) -> Arc<MemoryManager>;
+
+    /// Partition that the consumer belongs to
+    fn partition_id(&self) -> usize {
+        self.id().partition_id
+    }
+
+    /// Type of the consumer
+    fn type_(&self) -> &ConsumerType;
+
+    /// Grow memory by `required` to buffer more data in memory,
+    /// this may trigger spill before grow when the memory threshold is
+    /// reached for this consumer.
+    async fn try_grow(&self, required: usize) -> Result<()> {
+        let current_usage = self.mem_used();
+        let can_grow = self
+            .memory_manager()
+            .try_grow(required, current_usage, self.id())
+            .await;
+        if !can_grow {
+            info!(
+                "Failed to grow memory of {} from {}, spilling...",
+                human_readable_size(required),
+                self.id()
+            );
+            self.spill().await?;
+        }
+        Ok(())
+    }
+
+    /// Spill in-memory buffers to disk, free memory
+    async fn spill(&self) -> Result<()>;
+
+    /// Current memory used by this consumer
+    fn mem_used(&self) -> usize;
+
+    /// Current status of the consumer
+    fn str_repr(&self) -> String {
+        let mem = self.mem_used();
+        format!(
+            "{}[{}]: {}",
+            self.name(),
+            self.id(),
+            human_readable_size(mem)
+        )
+    }
+}
+
+/*
+The memory management architecture is the following:
+
+1. User designates max execution memory by setting RuntimeConfig.max_memory and RuntimeConfig.memory_fraction (float64 between 0..1).
+   The actual max memory DataFusion could use `pool_size =  max_memory * memory_fraction`.
+2. The entities that take up memory during its execution are called 'Memory Consumers'. Operators or others are encouraged to
+   register themselves to the memory manager and report its usage through `mem_used()`.
+3. There are two kinds of consumers:
+   - 'Controlling' consumers that would acquire memory during its execution and release memory through `spill` if no more memory is available.
+   - 'Tracking' consumers that exist for reporting purposes to provide a more accurate memory usage estimation for memory consumers.
+4. Controlling and tracking consumers share the pool. Each controlling consumer could acquire a maximum of
+   (pool_size - all_tracking_used) / active_num_controlling_consumers.
+
+            Memory Space for the DataFusion Lib / Process of `pool_size`
+   ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€zā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
+   ā”‚                                              z                             ā”‚
+   ā”‚                                              z                             ā”‚
+   ā”‚               Controlling                    z          Tracking           ā”‚
+   ā”‚            Memory Consumers                  z       Memory Consumers      ā”‚
+   ā”‚                                              z                             ā”‚
+   ā”‚                                              z                             ā”‚
+   ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€zā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜
+*/
+
+/// Manage memory usage during physical plan execution
+pub struct MemoryManager {
+    consumers: Arc<Mutex<HashMap<MemoryConsumerId, Arc<dyn MemoryConsumer>>>>,
+    trackers: Arc<Mutex<HashMap<MemoryConsumerId, Arc<dyn MemoryConsumer>>>>,
+    trackers_total_usage: AtomicUsize,
+    pool_size: usize,
+    join_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
+}
+
+impl MemoryManager {
+    /// Create new memory manager based on max available pool_size
+    pub fn new(pool_size: usize) -> Self {
+        info!(
+            "Creating memory manager with initial size {}",
+            human_readable_size(pool_size)
+        );
+        Self {
+            consumers: Arc::new(Mutex::new(HashMap::new())),
+            trackers: Arc::new(Mutex::new(HashMap::new())),
+            trackers_total_usage: AtomicUsize::new(0),
+            pool_size,
+            join_handle: Arc::new(Mutex::new(None)),
+        }
+    }
+
+    fn update_tracker_total(self: &Arc<Self>) {
+        let trackers = self.trackers.lock().unwrap();
+        if trackers.len() > 0 {
+            let sum = trackers.values().fold(0usize, |acc, y| acc + y.mem_used());
+            drop(trackers);
+            self.trackers_total_usage.store(sum, Ordering::SeqCst);
+        }
+    }
+
+    /// Initialize
+    pub(crate) fn initialize(self: &Arc<Self>) {
+        let manager = self.clone();
+        let handle = task::spawn(async move {
+            let mut interval_timer = tokio::time::interval(Duration::from_secs(60));
+            loop {
+                interval_timer.tick().await;
+                manager.update_tracker_total();
+            }
+        });
+        let _ = self.join_handle.lock().unwrap().insert(handle);
+    }
+
+    /// Register a new memory consumer for memory usage tracking
+    pub(crate) fn register_consumer(self: &Arc<Self>, consumer: Arc<dyn MemoryConsumer>) {
+        let id = consumer.id().clone();
+        match consumer.type_() {
+            ConsumerType::Controlling => {
+                let mut consumers = self.consumers.lock().unwrap();
+                consumers.insert(id, consumer);
+            }
+            ConsumerType::Tracking => {
+                let mut trackers = self.trackers.lock().unwrap();
+                trackers.insert(id, consumer);
+            }
+        }
+    }
+
+    /// Grow memory attempt from a consumer, return if we could grant that much to it
+    async fn try_grow(
+        self: &Arc<Self>,
+        required: usize,
+        current: usize,
+        consumer_id: &MemoryConsumerId,
+    ) -> bool {
+        let max_per_op = {
+            let total_available =
+                self.pool_size - self.trackers_total_usage.load(Ordering::SeqCst);
+            let ops = self.consumers.lock().unwrap().len();
+            (total_available / ops) as usize
+        };
+        let granted = required + current < max_per_op;
+        info!(
+            "trying to acquire {} whiling holding {} from {}, got: {}",
+            human_readable_size(required),
+            human_readable_size(current),
+            consumer_id,
+            granted,
+        );
+        granted
+    }
+
+    /// Drop a memory consumer from memory usage tracking
+    pub(crate) fn drop_consumer(self: &Arc<Self>, id: &MemoryConsumerId) {
+        // find in consumers first
+        {
+            let mut consumers = self.consumers.lock().unwrap();
+            if consumers.contains_key(id) {
+                consumers.remove(id);
+                return;
+            }

Review comment:
       ```suggestion
               if let Some(_) = consumers.remove(id) {
                   return;
               }
   ```

##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -0,0 +1,320 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Manages all available memory during query execution
+
+use crate::error::Result;
+use async_trait::async_trait;
+use hashbrown::HashMap;
+use log::info;
+use std::fmt;
+use std::fmt::{Debug, Display, Formatter};
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Mutex};
+use std::time::Duration;
+use tokio::task;
+use tokio::task::JoinHandle;
+
+static mut CONSUMER_ID: AtomicUsize = AtomicUsize::new(0);
+
+fn next_id() -> usize {
+    unsafe { CONSUMER_ID.fetch_add(1, Ordering::SeqCst) }
+}
+
+/// Type of the memory consumer
+pub enum ConsumerType {
+    /// consumers that can grow or shrink its memory usage during execution
+    /// such as spillable sorter, spillable hashmap, etc.
+    Controlling,
+    /// consumers that are not spillable, counting in for only tracking purpose.
+    Tracking,
+}
+
+#[derive(Clone, Debug, Hash, Eq, PartialEq)]
+/// Id that uniquely identifies a Memory Consumer
+pub struct MemoryConsumerId {
+    /// partition the consumer belongs to
+    pub partition_id: usize,
+    /// unique id
+    pub id: usize,
+}
+
+impl MemoryConsumerId {
+    /// Auto incremented new Id
+    pub fn new(partition_id: usize) -> Self {
+        let id = next_id();
+        Self { partition_id, id }
+    }
+}
+
+impl Display for MemoryConsumerId {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        write!(f, "{}:{}", self.partition_id, self.id)
+    }
+}
+
+#[async_trait]
+/// A memory consumer that either takes up memory (of type `ConsumerType::Tracking`)
+/// or grows/shrinks memory usage based on available memory (of type `ConsumerType::Controlling`).
+pub trait MemoryConsumer: Send + Sync + Debug {
+    /// Display name of the consumer
+    fn name(&self) -> String;
+
+    /// Unique id of the consumer
+    fn id(&self) -> &MemoryConsumerId;
+
+    /// Ptr to MemoryManager
+    fn memory_manager(&self) -> Arc<MemoryManager>;
+
+    /// Partition that the consumer belongs to
+    fn partition_id(&self) -> usize {
+        self.id().partition_id
+    }
+
+    /// Type of the consumer
+    fn type_(&self) -> &ConsumerType;
+
+    /// Grow memory by `required` to buffer more data in memory,
+    /// this may trigger spill before grow when the memory threshold is
+    /// reached for this consumer.
+    async fn try_grow(&self, required: usize) -> Result<()> {
+        let current_usage = self.mem_used();
+        let can_grow = self
+            .memory_manager()
+            .try_grow(required, current_usage, self.id())
+            .await;
+        if !can_grow {
+            info!(
+                "Failed to grow memory of {} from {}, spilling...",
+                human_readable_size(required),
+                self.id()
+            );
+            self.spill().await?;
+        }
+        Ok(())
+    }
+
+    /// Spill in-memory buffers to disk, free memory
+    async fn spill(&self) -> Result<()>;
+
+    /// Current memory used by this consumer
+    fn mem_used(&self) -> usize;
+
+    /// Current status of the consumer
+    fn str_repr(&self) -> String {
+        let mem = self.mem_used();
+        format!(
+            "{}[{}]: {}",
+            self.name(),
+            self.id(),
+            human_readable_size(mem)
+        )
+    }
+}
+
+/*
+The memory management architecture is the following:
+
+1. User designates max execution memory by setting RuntimeConfig.max_memory and RuntimeConfig.memory_fraction (float64 between 0..1).
+   The actual max memory DataFusion could use `pool_size =  max_memory * memory_fraction`.
+2. The entities that take up memory during its execution are called 'Memory Consumers'. Operators or others are encouraged to
+   register themselves to the memory manager and report its usage through `mem_used()`.
+3. There are two kinds of consumers:
+   - 'Controlling' consumers that would acquire memory during its execution and release memory through `spill` if no more memory is available.
+   - 'Tracking' consumers that exist for reporting purposes to provide a more accurate memory usage estimation for memory consumers.
+4. Controlling and tracking consumers share the pool. Each controlling consumer could acquire a maximum of
+   (pool_size - all_tracking_used) / active_num_controlling_consumers.
+
+            Memory Space for the DataFusion Lib / Process of `pool_size`
+   ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€zā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
+   ā”‚                                              z                             ā”‚
+   ā”‚                                              z                             ā”‚
+   ā”‚               Controlling                    z          Tracking           ā”‚
+   ā”‚            Memory Consumers                  z       Memory Consumers      ā”‚
+   ā”‚                                              z                             ā”‚
+   ā”‚                                              z                             ā”‚
+   ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€zā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜
+*/
+
+/// Manage memory usage during physical plan execution
+pub struct MemoryManager {
+    consumers: Arc<Mutex<HashMap<MemoryConsumerId, Arc<dyn MemoryConsumer>>>>,
+    trackers: Arc<Mutex<HashMap<MemoryConsumerId, Arc<dyn MemoryConsumer>>>>,
+    trackers_total_usage: AtomicUsize,
+    pool_size: usize,
+    join_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
+}
+
+impl MemoryManager {
+    /// Create new memory manager based on max available pool_size
+    pub fn new(pool_size: usize) -> Self {
+        info!(
+            "Creating memory manager with initial size {}",
+            human_readable_size(pool_size)
+        );
+        Self {
+            consumers: Arc::new(Mutex::new(HashMap::new())),
+            trackers: Arc::new(Mutex::new(HashMap::new())),
+            trackers_total_usage: AtomicUsize::new(0),
+            pool_size,
+            join_handle: Arc::new(Mutex::new(None)),
+        }
+    }
+
+    fn update_tracker_total(self: &Arc<Self>) {
+        let trackers = self.trackers.lock().unwrap();
+        if trackers.len() > 0 {
+            let sum = trackers.values().fold(0usize, |acc, y| acc + y.mem_used());
+            drop(trackers);
+            self.trackers_total_usage.store(sum, Ordering::SeqCst);
+        }
+    }
+
+    /// Initialize
+    pub(crate) fn initialize(self: &Arc<Self>) {
+        let manager = self.clone();
+        let handle = task::spawn(async move {
+            let mut interval_timer = tokio::time::interval(Duration::from_secs(60));
+            loop {
+                interval_timer.tick().await;
+                manager.update_tracker_total();
+            }
+        });
+        let _ = self.join_handle.lock().unwrap().insert(handle);
+    }
+
+    /// Register a new memory consumer for memory usage tracking
+    pub(crate) fn register_consumer(self: &Arc<Self>, consumer: Arc<dyn MemoryConsumer>) {

Review comment:
       I didn't see any code that registered any Tracking consumers yet. 
   
   In terms of plumbing, what do you think about:
   1. making all `ExecutionPlans` `MemoryConsumers` and providing default implementations (that reported 0 usage)
   2. Registering all ExecutionPlans somehow as MemoryConsumers as part of physical plan creation?
   
   That way all implementations of ExecutionPlan could report their usage without having to explicitly register themselves with the memory manager. Also the manager could report on how many operators were not providing any statistics, etc

##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -0,0 +1,320 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Manages all available memory during query execution
+
+use crate::error::Result;
+use async_trait::async_trait;
+use hashbrown::HashMap;
+use log::info;
+use std::fmt;
+use std::fmt::{Debug, Display, Formatter};
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Mutex};
+use std::time::Duration;
+use tokio::task;
+use tokio::task::JoinHandle;
+
+static mut CONSUMER_ID: AtomicUsize = AtomicUsize::new(0);
+
+fn next_id() -> usize {
+    unsafe { CONSUMER_ID.fetch_add(1, Ordering::SeqCst) }
+}
+
+/// Type of the memory consumer
+pub enum ConsumerType {
+    /// consumers that can grow or shrink its memory usage during execution
+    /// such as spillable sorter, spillable hashmap, etc.
+    Controlling,
+    /// consumers that are not spillable, counting in for only tracking purpose.
+    Tracking,
+}
+
+#[derive(Clone, Debug, Hash, Eq, PartialEq)]
+/// Id that uniquely identifies a Memory Consumer
+pub struct MemoryConsumerId {
+    /// partition the consumer belongs to
+    pub partition_id: usize,
+    /// unique id
+    pub id: usize,
+}
+
+impl MemoryConsumerId {
+    /// Auto incremented new Id
+    pub fn new(partition_id: usize) -> Self {
+        let id = next_id();
+        Self { partition_id, id }
+    }
+}
+
+impl Display for MemoryConsumerId {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        write!(f, "{}:{}", self.partition_id, self.id)
+    }
+}
+
+#[async_trait]
+/// A memory consumer that either takes up memory (of type `ConsumerType::Tracking`)
+/// or grows/shrinks memory usage based on available memory (of type `ConsumerType::Controlling`).
+pub trait MemoryConsumer: Send + Sync + Debug {
+    /// Display name of the consumer
+    fn name(&self) -> String;
+
+    /// Unique id of the consumer
+    fn id(&self) -> &MemoryConsumerId;
+
+    /// Ptr to MemoryManager
+    fn memory_manager(&self) -> Arc<MemoryManager>;
+
+    /// Partition that the consumer belongs to
+    fn partition_id(&self) -> usize {
+        self.id().partition_id
+    }
+
+    /// Type of the consumer
+    fn type_(&self) -> &ConsumerType;
+
+    /// Grow memory by `required` to buffer more data in memory,
+    /// this may trigger spill before grow when the memory threshold is
+    /// reached for this consumer.
+    async fn try_grow(&self, required: usize) -> Result<()> {
+        let current_usage = self.mem_used();
+        let can_grow = self
+            .memory_manager()
+            .try_grow(required, current_usage, self.id())
+            .await;
+        if !can_grow {
+            info!(
+                "Failed to grow memory of {} from {}, spilling...",
+                human_readable_size(required),
+                self.id()
+            );
+            self.spill().await?;
+        }
+        Ok(())
+    }
+
+    /// Spill in-memory buffers to disk, free memory
+    async fn spill(&self) -> Result<()>;
+
+    /// Current memory used by this consumer
+    fn mem_used(&self) -> usize;
+
+    /// Current status of the consumer
+    fn str_repr(&self) -> String {
+        let mem = self.mem_used();
+        format!(
+            "{}[{}]: {}",
+            self.name(),
+            self.id(),
+            human_readable_size(mem)
+        )
+    }
+}
+
+/*
+The memory management architecture is the following:
+
+1. User designates max execution memory by setting RuntimeConfig.max_memory and RuntimeConfig.memory_fraction (float64 between 0..1).
+   The actual max memory DataFusion could use `pool_size =  max_memory * memory_fraction`.
+2. The entities that take up memory during its execution are called 'Memory Consumers'. Operators or others are encouraged to
+   register themselves to the memory manager and report its usage through `mem_used()`.
+3. There are two kinds of consumers:
+   - 'Controlling' consumers that would acquire memory during its execution and release memory through `spill` if no more memory is available.
+   - 'Tracking' consumers that exist for reporting purposes to provide a more accurate memory usage estimation for memory consumers.
+4. Controlling and tracking consumers share the pool. Each controlling consumer could acquire a maximum of
+   (pool_size - all_tracking_used) / active_num_controlling_consumers.
+
+            Memory Space for the DataFusion Lib / Process of `pool_size`
+   ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€zā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
+   ā”‚                                              z                             ā”‚
+   ā”‚                                              z                             ā”‚
+   ā”‚               Controlling                    z          Tracking           ā”‚
+   ā”‚            Memory Consumers                  z       Memory Consumers      ā”‚
+   ā”‚                                              z                             ā”‚
+   ā”‚                                              z                             ā”‚
+   ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€zā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜
+*/
+
+/// Manage memory usage during physical plan execution
+pub struct MemoryManager {
+    consumers: Arc<Mutex<HashMap<MemoryConsumerId, Arc<dyn MemoryConsumer>>>>,
+    trackers: Arc<Mutex<HashMap<MemoryConsumerId, Arc<dyn MemoryConsumer>>>>,
+    trackers_total_usage: AtomicUsize,
+    pool_size: usize,
+    join_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
+}
+
+impl MemoryManager {
+    /// Create new memory manager based on max available pool_size
+    pub fn new(pool_size: usize) -> Self {
+        info!(
+            "Creating memory manager with initial size {}",
+            human_readable_size(pool_size)
+        );
+        Self {
+            consumers: Arc::new(Mutex::new(HashMap::new())),
+            trackers: Arc::new(Mutex::new(HashMap::new())),
+            trackers_total_usage: AtomicUsize::new(0),
+            pool_size,
+            join_handle: Arc::new(Mutex::new(None)),
+        }
+    }
+
+    fn update_tracker_total(self: &Arc<Self>) {
+        let trackers = self.trackers.lock().unwrap();
+        if trackers.len() > 0 {
+            let sum = trackers.values().fold(0usize, |acc, y| acc + y.mem_used());
+            drop(trackers);
+            self.trackers_total_usage.store(sum, Ordering::SeqCst);
+        }
+    }
+
+    /// Initialize
+    pub(crate) fn initialize(self: &Arc<Self>) {
+        let manager = self.clone();
+        let handle = task::spawn(async move {
+            let mut interval_timer = tokio::time::interval(Duration::from_secs(60));
+            loop {
+                interval_timer.tick().await;
+                manager.update_tracker_total();
+            }
+        });
+        let _ = self.join_handle.lock().unwrap().insert(handle);
+    }
+
+    /// Register a new memory consumer for memory usage tracking
+    pub(crate) fn register_consumer(self: &Arc<Self>, consumer: Arc<dyn MemoryConsumer>) {
+        let id = consumer.id().clone();
+        match consumer.type_() {
+            ConsumerType::Controlling => {
+                let mut consumers = self.consumers.lock().unwrap();
+                consumers.insert(id, consumer);
+            }
+            ConsumerType::Tracking => {
+                let mut trackers = self.trackers.lock().unwrap();
+                trackers.insert(id, consumer);
+            }
+        }
+    }
+
+    /// Grow memory attempt from a consumer, return if we could grant that much to it
+    async fn try_grow(
+        self: &Arc<Self>,
+        required: usize,
+        current: usize,
+        consumer_id: &MemoryConsumerId,
+    ) -> bool {
+        let max_per_op = {
+            let total_available =
+                self.pool_size - self.trackers_total_usage.load(Ordering::SeqCst);
+            let ops = self.consumers.lock().unwrap().len();
+            (total_available / ops) as usize
+        };
+        let granted = required + current < max_per_op;
+        info!(
+            "trying to acquire {} whiling holding {} from {}, got: {}",
+            human_readable_size(required),
+            human_readable_size(current),
+            consumer_id,
+            granted,
+        );
+        granted
+    }
+
+    /// Drop a memory consumer from memory usage tracking
+    pub(crate) fn drop_consumer(self: &Arc<Self>, id: &MemoryConsumerId) {
+        // find in consumers first
+        {
+            let mut consumers = self.consumers.lock().unwrap();
+            if consumers.contains_key(id) {
+                consumers.remove(id);
+                return;
+            }
+        }
+        {
+            let mut trackers = self.trackers.lock().unwrap();
+            if trackers.contains_key(id) {
+                let removed = trackers.remove(id);
+                match removed {
+                    None => {}
+                    Some(tracker) => {
+                        let usage = tracker.mem_used();
+                        self.trackers_total_usage.fetch_sub(usage, Ordering::SeqCst);
+                    }
+                }
+            }
+        }
+    }
+
+    /// Shutdown
+    pub(crate) fn shutdown(&mut self) {
+        let maybe_handle = self.join_handle.lock().unwrap().take();
+        match maybe_handle {
+            None => {}
+            Some(handle) => handle.abort(),
+        }
+    }
+
+    #[allow(dead_code)]

Review comment:
       Recommend making this `impl Display for MemoryManager` instead

##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -0,0 +1,320 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Manages all available memory during query execution
+
+use crate::error::Result;
+use async_trait::async_trait;
+use hashbrown::HashMap;
+use log::info;
+use std::fmt;
+use std::fmt::{Debug, Display, Formatter};
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Mutex};
+use std::time::Duration;
+use tokio::task;
+use tokio::task::JoinHandle;
+
+static mut CONSUMER_ID: AtomicUsize = AtomicUsize::new(0);
+
+fn next_id() -> usize {
+    unsafe { CONSUMER_ID.fetch_add(1, Ordering::SeqCst) }
+}
+
+/// Type of the memory consumer
+pub enum ConsumerType {
+    /// consumers that can grow or shrink its memory usage during execution

Review comment:
       I think the two categories `controlling` and `tracking` categories make sense to me
   
   It seems like the key difference is that `controlling` consumers work with the resource manager to limit their resource usage (aka ask for more memory, or spill to disk, etc) where as `tracking` consumers just report their usage to the resource manager. 
   
   Maybe renaming `Controlling` -->  `Requesting` would make it clearer?
   
   But I think `Controlling` is also fine with some documentation clarification

##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -0,0 +1,320 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Manages all available memory during query execution
+
+use crate::error::Result;
+use async_trait::async_trait;
+use hashbrown::HashMap;
+use log::info;
+use std::fmt;
+use std::fmt::{Debug, Display, Formatter};
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Mutex};
+use std::time::Duration;
+use tokio::task;
+use tokio::task::JoinHandle;
+
+static mut CONSUMER_ID: AtomicUsize = AtomicUsize::new(0);
+
+fn next_id() -> usize {
+    unsafe { CONSUMER_ID.fetch_add(1, Ordering::SeqCst) }
+}
+
+/// Type of the memory consumer
+pub enum ConsumerType {
+    /// consumers that can grow or shrink its memory usage during execution
+    /// such as spillable sorter, spillable hashmap, etc.
+    Controlling,
+    /// consumers that are not spillable, counting in for only tracking purpose.
+    Tracking,
+}
+
+#[derive(Clone, Debug, Hash, Eq, PartialEq)]
+/// Id that uniquely identifies a Memory Consumer
+pub struct MemoryConsumerId {
+    /// partition the consumer belongs to
+    pub partition_id: usize,
+    /// unique id
+    pub id: usize,
+}
+
+impl MemoryConsumerId {
+    /// Auto incremented new Id
+    pub fn new(partition_id: usize) -> Self {
+        let id = next_id();
+        Self { partition_id, id }
+    }
+}
+
+impl Display for MemoryConsumerId {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        write!(f, "{}:{}", self.partition_id, self.id)
+    }
+}
+
+#[async_trait]
+/// A memory consumer that either takes up memory (of type `ConsumerType::Tracking`)
+/// or grows/shrinks memory usage based on available memory (of type `ConsumerType::Controlling`).
+pub trait MemoryConsumer: Send + Sync + Debug {
+    /// Display name of the consumer
+    fn name(&self) -> String;
+
+    /// Unique id of the consumer
+    fn id(&self) -> &MemoryConsumerId;
+
+    /// Ptr to MemoryManager
+    fn memory_manager(&self) -> Arc<MemoryManager>;
+
+    /// Partition that the consumer belongs to
+    fn partition_id(&self) -> usize {
+        self.id().partition_id
+    }
+
+    /// Type of the consumer
+    fn type_(&self) -> &ConsumerType;
+
+    /// Grow memory by `required` to buffer more data in memory,
+    /// this may trigger spill before grow when the memory threshold is
+    /// reached for this consumer.
+    async fn try_grow(&self, required: usize) -> Result<()> {
+        let current_usage = self.mem_used();
+        let can_grow = self
+            .memory_manager()
+            .try_grow(required, current_usage, self.id())
+            .await;
+        if !can_grow {
+            info!(
+                "Failed to grow memory of {} from {}, spilling...",
+                human_readable_size(required),
+                self.id()
+            );
+            self.spill().await?;
+        }
+        Ok(())
+    }
+
+    /// Spill in-memory buffers to disk, free memory
+    async fn spill(&self) -> Result<()>;
+
+    /// Current memory used by this consumer
+    fn mem_used(&self) -> usize;
+
+    /// Current status of the consumer
+    fn str_repr(&self) -> String {
+        let mem = self.mem_used();
+        format!(
+            "{}[{}]: {}",
+            self.name(),
+            self.id(),
+            human_readable_size(mem)
+        )
+    }
+}
+
+/*
+The memory management architecture is the following:
+
+1. User designates max execution memory by setting RuntimeConfig.max_memory and RuntimeConfig.memory_fraction (float64 between 0..1).
+   The actual max memory DataFusion could use `pool_size =  max_memory * memory_fraction`.
+2. The entities that take up memory during its execution are called 'Memory Consumers'. Operators or others are encouraged to
+   register themselves to the memory manager and report its usage through `mem_used()`.
+3. There are two kinds of consumers:
+   - 'Controlling' consumers that would acquire memory during its execution and release memory through `spill` if no more memory is available.
+   - 'Tracking' consumers that exist for reporting purposes to provide a more accurate memory usage estimation for memory consumers.
+4. Controlling and tracking consumers share the pool. Each controlling consumer could acquire a maximum of
+   (pool_size - all_tracking_used) / active_num_controlling_consumers.
+
+            Memory Space for the DataFusion Lib / Process of `pool_size`
+   ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€zā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
+   ā”‚                                              z                             ā”‚
+   ā”‚                                              z                             ā”‚
+   ā”‚               Controlling                    z          Tracking           ā”‚
+   ā”‚            Memory Consumers                  z       Memory Consumers      ā”‚
+   ā”‚                                              z                             ā”‚
+   ā”‚                                              z                             ā”‚
+   ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€zā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜
+*/
+
+/// Manage memory usage during physical plan execution
+pub struct MemoryManager {
+    consumers: Arc<Mutex<HashMap<MemoryConsumerId, Arc<dyn MemoryConsumer>>>>,
+    trackers: Arc<Mutex<HashMap<MemoryConsumerId, Arc<dyn MemoryConsumer>>>>,
+    trackers_total_usage: AtomicUsize,
+    pool_size: usize,
+    join_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
+}
+
+impl MemoryManager {
+    /// Create new memory manager based on max available pool_size
+    pub fn new(pool_size: usize) -> Self {
+        info!(
+            "Creating memory manager with initial size {}",
+            human_readable_size(pool_size)
+        );
+        Self {
+            consumers: Arc::new(Mutex::new(HashMap::new())),
+            trackers: Arc::new(Mutex::new(HashMap::new())),
+            trackers_total_usage: AtomicUsize::new(0),
+            pool_size,
+            join_handle: Arc::new(Mutex::new(None)),
+        }
+    }
+
+    fn update_tracker_total(self: &Arc<Self>) {
+        let trackers = self.trackers.lock().unwrap();
+        if trackers.len() > 0 {
+            let sum = trackers.values().fold(0usize, |acc, y| acc + y.mem_used());
+            drop(trackers);
+            self.trackers_total_usage.store(sum, Ordering::SeqCst);
+        }
+    }
+
+    /// Initialize
+    pub(crate) fn initialize(self: &Arc<Self>) {
+        let manager = self.clone();
+        let handle = task::spawn(async move {
+            let mut interval_timer = tokio::time::interval(Duration::from_secs(60));
+            loop {
+                interval_timer.tick().await;
+                manager.update_tracker_total();
+            }
+        });
+        let _ = self.join_handle.lock().unwrap().insert(handle);
+    }
+
+    /// Register a new memory consumer for memory usage tracking
+    pub(crate) fn register_consumer(self: &Arc<Self>, consumer: Arc<dyn MemoryConsumer>) {
+        let id = consumer.id().clone();
+        match consumer.type_() {
+            ConsumerType::Controlling => {
+                let mut consumers = self.consumers.lock().unwrap();

Review comment:
       I left some comments above, but I also think `Controlling` and `Tracking` is fine 
   
   Maybe using `Requesting` instead of `Controlling` would be clearer

##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -0,0 +1,320 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Manages all available memory during query execution
+
+use crate::error::Result;
+use async_trait::async_trait;
+use hashbrown::HashMap;
+use log::info;
+use std::fmt;
+use std::fmt::{Debug, Display, Formatter};
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Mutex};
+use std::time::Duration;
+use tokio::task;
+use tokio::task::JoinHandle;
+
+static mut CONSUMER_ID: AtomicUsize = AtomicUsize::new(0);
+
+fn next_id() -> usize {
+    unsafe { CONSUMER_ID.fetch_add(1, Ordering::SeqCst) }
+}
+
+/// Type of the memory consumer
+pub enum ConsumerType {
+    /// consumers that can grow or shrink its memory usage during execution
+    /// such as spillable sorter, spillable hashmap, etc.
+    Controlling,
+    /// consumers that are not spillable, counting in for only tracking purpose.
+    Tracking,
+}
+
+#[derive(Clone, Debug, Hash, Eq, PartialEq)]
+/// Id that uniquely identifies a Memory Consumer
+pub struct MemoryConsumerId {
+    /// partition the consumer belongs to
+    pub partition_id: usize,
+    /// unique id
+    pub id: usize,
+}
+
+impl MemoryConsumerId {
+    /// Auto incremented new Id
+    pub fn new(partition_id: usize) -> Self {
+        let id = next_id();
+        Self { partition_id, id }
+    }
+}
+
+impl Display for MemoryConsumerId {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        write!(f, "{}:{}", self.partition_id, self.id)
+    }
+}
+
+#[async_trait]
+/// A memory consumer that either takes up memory (of type `ConsumerType::Tracking`)
+/// or grows/shrinks memory usage based on available memory (of type `ConsumerType::Controlling`).
+pub trait MemoryConsumer: Send + Sync + Debug {
+    /// Display name of the consumer
+    fn name(&self) -> String;
+
+    /// Unique id of the consumer
+    fn id(&self) -> &MemoryConsumerId;
+
+    /// Ptr to MemoryManager
+    fn memory_manager(&self) -> Arc<MemoryManager>;
+
+    /// Partition that the consumer belongs to
+    fn partition_id(&self) -> usize {
+        self.id().partition_id
+    }
+
+    /// Type of the consumer
+    fn type_(&self) -> &ConsumerType;
+
+    /// Grow memory by `required` to buffer more data in memory,
+    /// this may trigger spill before grow when the memory threshold is
+    /// reached for this consumer.
+    async fn try_grow(&self, required: usize) -> Result<()> {
+        let current_usage = self.mem_used();
+        let can_grow = self
+            .memory_manager()
+            .try_grow(required, current_usage, self.id())
+            .await;
+        if !can_grow {
+            info!(
+                "Failed to grow memory of {} from {}, spilling...",
+                human_readable_size(required),
+                self.id()
+            );
+            self.spill().await?;
+        }
+        Ok(())
+    }
+
+    /// Spill in-memory buffers to disk, free memory
+    async fn spill(&self) -> Result<()>;
+
+    /// Current memory used by this consumer
+    fn mem_used(&self) -> usize;
+
+    /// Current status of the consumer
+    fn str_repr(&self) -> String {
+        let mem = self.mem_used();
+        format!(
+            "{}[{}]: {}",
+            self.name(),
+            self.id(),
+            human_readable_size(mem)
+        )
+    }
+}
+
+/*
+The memory management architecture is the following:
+
+1. User designates max execution memory by setting RuntimeConfig.max_memory and RuntimeConfig.memory_fraction (float64 between 0..1).
+   The actual max memory DataFusion could use `pool_size =  max_memory * memory_fraction`.
+2. The entities that take up memory during its execution are called 'Memory Consumers'. Operators or others are encouraged to
+   register themselves to the memory manager and report its usage through `mem_used()`.
+3. There are two kinds of consumers:
+   - 'Controlling' consumers that would acquire memory during its execution and release memory through `spill` if no more memory is available.
+   - 'Tracking' consumers that exist for reporting purposes to provide a more accurate memory usage estimation for memory consumers.
+4. Controlling and tracking consumers share the pool. Each controlling consumer could acquire a maximum of
+   (pool_size - all_tracking_used) / active_num_controlling_consumers.
+
+            Memory Space for the DataFusion Lib / Process of `pool_size`
+   ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€zā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
+   ā”‚                                              z                             ā”‚
+   ā”‚                                              z                             ā”‚
+   ā”‚               Controlling                    z          Tracking           ā”‚
+   ā”‚            Memory Consumers                  z       Memory Consumers      ā”‚
+   ā”‚                                              z                             ā”‚
+   ā”‚                                              z                             ā”‚
+   ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€zā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜
+*/
+
+/// Manage memory usage during physical plan execution
+pub struct MemoryManager {
+    consumers: Arc<Mutex<HashMap<MemoryConsumerId, Arc<dyn MemoryConsumer>>>>,

Review comment:
       As mentioned above, I think there is a refcount cycle here
   
   Specifically, `ExternalSort` has a `RuntimeEnv` which has an `Arc<MemoryManager>` but the MemoryManager has a `Arc` back to the `ExternalSort`.
   
   Thus I suggest changing this and the trackers below to be
   
   ```suggestion
       consumers: Arc<Mutex<HashMap<MemoryConsumerId, Weak<dyn MemoryConsumer>>>>,
   ```
   
   to break cycle 

##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -0,0 +1,320 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Manages all available memory during query execution
+
+use crate::error::Result;
+use async_trait::async_trait;
+use hashbrown::HashMap;
+use log::info;
+use std::fmt;
+use std::fmt::{Debug, Display, Formatter};
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Mutex};
+use std::time::Duration;
+use tokio::task;
+use tokio::task::JoinHandle;
+
+static mut CONSUMER_ID: AtomicUsize = AtomicUsize::new(0);
+
+fn next_id() -> usize {
+    unsafe { CONSUMER_ID.fetch_add(1, Ordering::SeqCst) }
+}
+
+/// Type of the memory consumer
+pub enum ConsumerType {
+    /// consumers that can grow or shrink its memory usage during execution
+    /// such as spillable sorter, spillable hashmap, etc.
+    Controlling,
+    /// consumers that are not spillable, counting in for only tracking purpose.
+    Tracking,
+}
+
+#[derive(Clone, Debug, Hash, Eq, PartialEq)]
+/// Id that uniquely identifies a Memory Consumer
+pub struct MemoryConsumerId {
+    /// partition the consumer belongs to
+    pub partition_id: usize,
+    /// unique id
+    pub id: usize,
+}
+
+impl MemoryConsumerId {
+    /// Auto incremented new Id
+    pub fn new(partition_id: usize) -> Self {
+        let id = next_id();
+        Self { partition_id, id }
+    }
+}
+
+impl Display for MemoryConsumerId {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        write!(f, "{}:{}", self.partition_id, self.id)
+    }
+}
+
+#[async_trait]
+/// A memory consumer that either takes up memory (of type `ConsumerType::Tracking`)
+/// or grows/shrinks memory usage based on available memory (of type `ConsumerType::Controlling`).
+pub trait MemoryConsumer: Send + Sync + Debug {
+    /// Display name of the consumer
+    fn name(&self) -> String;
+
+    /// Unique id of the consumer
+    fn id(&self) -> &MemoryConsumerId;
+
+    /// Ptr to MemoryManager
+    fn memory_manager(&self) -> Arc<MemoryManager>;
+
+    /// Partition that the consumer belongs to
+    fn partition_id(&self) -> usize {
+        self.id().partition_id
+    }
+
+    /// Type of the consumer
+    fn type_(&self) -> &ConsumerType;
+
+    /// Grow memory by `required` to buffer more data in memory,
+    /// this may trigger spill before grow when the memory threshold is
+    /// reached for this consumer.
+    async fn try_grow(&self, required: usize) -> Result<()> {
+        let current_usage = self.mem_used();
+        let can_grow = self
+            .memory_manager()
+            .try_grow(required, current_usage, self.id())
+            .await;
+        if !can_grow {
+            info!(
+                "Failed to grow memory of {} from {}, spilling...",
+                human_readable_size(required),
+                self.id()
+            );
+            self.spill().await?;
+        }
+        Ok(())
+    }
+
+    /// Spill in-memory buffers to disk, free memory
+    async fn spill(&self) -> Result<()>;
+
+    /// Current memory used by this consumer
+    fn mem_used(&self) -> usize;
+
+    /// Current status of the consumer
+    fn str_repr(&self) -> String {
+        let mem = self.mem_used();
+        format!(
+            "{}[{}]: {}",
+            self.name(),
+            self.id(),
+            human_readable_size(mem)
+        )
+    }
+}
+
+/*
+The memory management architecture is the following:
+
+1. User designates max execution memory by setting RuntimeConfig.max_memory and RuntimeConfig.memory_fraction (float64 between 0..1).
+   The actual max memory DataFusion could use `pool_size =  max_memory * memory_fraction`.
+2. The entities that take up memory during its execution are called 'Memory Consumers'. Operators or others are encouraged to
+   register themselves to the memory manager and report its usage through `mem_used()`.
+3. There are two kinds of consumers:
+   - 'Controlling' consumers that would acquire memory during its execution and release memory through `spill` if no more memory is available.
+   - 'Tracking' consumers that exist for reporting purposes to provide a more accurate memory usage estimation for memory consumers.
+4. Controlling and tracking consumers share the pool. Each controlling consumer could acquire a maximum of
+   (pool_size - all_tracking_used) / active_num_controlling_consumers.
+
+            Memory Space for the DataFusion Lib / Process of `pool_size`
+   ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€zā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
+   ā”‚                                              z                             ā”‚
+   ā”‚                                              z                             ā”‚
+   ā”‚               Controlling                    z          Tracking           ā”‚
+   ā”‚            Memory Consumers                  z       Memory Consumers      ā”‚
+   ā”‚                                              z                             ā”‚
+   ā”‚                                              z                             ā”‚
+   ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€zā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜
+*/
+
+/// Manage memory usage during physical plan execution
+pub struct MemoryManager {
+    consumers: Arc<Mutex<HashMap<MemoryConsumerId, Arc<dyn MemoryConsumer>>>>,
+    trackers: Arc<Mutex<HashMap<MemoryConsumerId, Arc<dyn MemoryConsumer>>>>,
+    trackers_total_usage: AtomicUsize,
+    pool_size: usize,
+    join_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
+}
+
+impl MemoryManager {
+    /// Create new memory manager based on max available pool_size
+    pub fn new(pool_size: usize) -> Self {
+        info!(
+            "Creating memory manager with initial size {}",
+            human_readable_size(pool_size)
+        );
+        Self {
+            consumers: Arc::new(Mutex::new(HashMap::new())),
+            trackers: Arc::new(Mutex::new(HashMap::new())),
+            trackers_total_usage: AtomicUsize::new(0),
+            pool_size,
+            join_handle: Arc::new(Mutex::new(None)),
+        }
+    }
+
+    fn update_tracker_total(self: &Arc<Self>) {
+        let trackers = self.trackers.lock().unwrap();
+        if trackers.len() > 0 {
+            let sum = trackers.values().fold(0usize, |acc, y| acc + y.mem_used());
+            drop(trackers);
+            self.trackers_total_usage.store(sum, Ordering::SeqCst);
+        }
+    }
+
+    /// Initialize
+    pub(crate) fn initialize(self: &Arc<Self>) {
+        let manager = self.clone();
+        let handle = task::spawn(async move {
+            let mut interval_timer = tokio::time::interval(Duration::from_secs(60));
+            loop {
+                interval_timer.tick().await;
+                manager.update_tracker_total();
+            }
+        });
+        let _ = self.join_handle.lock().unwrap().insert(handle);
+    }
+
+    /// Register a new memory consumer for memory usage tracking
+    pub(crate) fn register_consumer(self: &Arc<Self>, consumer: Arc<dyn MemoryConsumer>) {
+        let id = consumer.id().clone();
+        match consumer.type_() {
+            ConsumerType::Controlling => {
+                let mut consumers = self.consumers.lock().unwrap();
+                consumers.insert(id, consumer);
+            }
+            ConsumerType::Tracking => {
+                let mut trackers = self.trackers.lock().unwrap();
+                trackers.insert(id, consumer);
+            }
+        }
+    }
+
+    /// Grow memory attempt from a consumer, return if we could grant that much to it
+    async fn try_grow(
+        self: &Arc<Self>,
+        required: usize,
+        current: usize,
+        consumer_id: &MemoryConsumerId,
+    ) -> bool {
+        let max_per_op = {
+            let total_available =
+                self.pool_size - self.trackers_total_usage.load(Ordering::SeqCst);
+            let ops = self.consumers.lock().unwrap().len();
+            (total_available / ops) as usize
+        };
+        let granted = required + current < max_per_op;
+        info!(
+            "trying to acquire {} whiling holding {} from {}, got: {}",
+            human_readable_size(required),
+            human_readable_size(current),
+            consumer_id,
+            granted,
+        );
+        granted
+    }
+
+    /// Drop a memory consumer from memory usage tracking
+    pub(crate) fn drop_consumer(self: &Arc<Self>, id: &MemoryConsumerId) {
+        // find in consumers first
+        {
+            let mut consumers = self.consumers.lock().unwrap();
+            if consumers.contains_key(id) {
+                consumers.remove(id);
+                return;
+            }
+        }
+        {
+            let mut trackers = self.trackers.lock().unwrap();
+            if trackers.contains_key(id) {
+                let removed = trackers.remove(id);
+                match removed {
+                    None => {}
+                    Some(tracker) => {
+                        let usage = tracker.mem_used();
+                        self.trackers_total_usage.fetch_sub(usage, Ordering::SeqCst);
+                    }
+                }
+            }

Review comment:
       ```suggestion
               if let Some(tracker) = trackers.remove(id) {
                           let usage = tracker.mem_used();
                           self.trackers_total_usage.fetch_sub(usage, Ordering::SeqCst);
               }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org