You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/04 18:48:14 UTC

[arrow-datafusion] branch master updated: fix: incorrect memory usage track for sort (#2135)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a4a835bd fix:  incorrect memory usage track for sort (#2135)
2a4a835bd is described below

commit 2a4a835bd727b00c58631a1c807bb598a0a12a93
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Tue Apr 5 02:48:09 2022 +0800

    fix:  incorrect memory usage track for sort (#2135)
    
    * fix:  incorrect memory usage track for sort
    
    * tests
---
 datafusion/core/src/execution/memory_manager.rs | 37 +++++++++++++++++++++----
 datafusion/core/src/physical_plan/sorts/sort.rs | 37 +++++++++++++++++++++++--
 datafusion/core/tests/order_spill_fuzz.rs       |  8 ++++++
 3 files changed, 75 insertions(+), 7 deletions(-)

diff --git a/datafusion/core/src/execution/memory_manager.rs b/datafusion/core/src/execution/memory_manager.rs
index e48585ea2..4e6a61d02 100644
--- a/datafusion/core/src/execution/memory_manager.rs
+++ b/datafusion/core/src/execution/memory_manager.rs
@@ -195,6 +195,12 @@ pub trait MemoryConsumer: Send + Sync {
         Ok(())
     }
 
+    /// Return `freed` memory to the memory manager,
+    /// may wake up other requesters waiting for their minimum memory quota.
+    fn shrink(&self, freed: usize) {
+        self.memory_manager().record_free(freed);
+    }
+
     /// Spill in-memory buffers to disk, free memory, return the previous used
     async fn spill(&self) -> Result<usize>;
 
@@ -303,7 +309,8 @@ impl MemoryManager {
         ));
     }
 
-    fn get_requester_total(&self) -> usize {
+    /// Return the total memory usage for all requesters
+    pub fn get_requester_total(&self) -> usize {
         *self.requesters_total.lock()
     }
 
@@ -342,8 +349,8 @@ impl MemoryManager {
                 // if we cannot acquire at lease 1/2n memory, just wait for others
                 // to spill instead spill self frequently with limited total mem
                 debug!(
-                    "Cannot acquire minimum amount of memory {} on memory manager {}, waiting for others to spill ...",
-                    human_readable_size(min_per_rqt), self);
+                    "Cannot acquire a minimum amount of {} memory from the manager of total {}, waiting for others to spill ...",
+                    human_readable_size(min_per_rqt), human_readable_size(self.pool_size));
                 let now = Instant::now();
                 self.cv.wait(&mut rqt_current_used);
                 let elapsed = now.elapsed();
@@ -361,12 +368,30 @@ impl MemoryManager {
         granted
     }
 
-    fn record_free_then_acquire(&self, freed: usize, acquired: usize) -> usize {
+    fn record_free_then_acquire(&self, freed: usize, acquired: usize) {
         let mut requesters_total = self.requesters_total.lock();
+        debug!(
+            "free_then_acquire: total {}, freed {}, acquired {}",
+            human_readable_size(*requesters_total),
+            human_readable_size(freed),
+            human_readable_size(acquired)
+        );
         assert!(*requesters_total >= freed);
         *requesters_total -= freed;
         *requesters_total += acquired;
-        self.cv.notify_all()
+        self.cv.notify_all();
+    }
+
+    fn record_free(&self, freed: usize) {
+        let mut requesters_total = self.requesters_total.lock();
+        debug!(
+            "free: total {}, freed {}",
+            human_readable_size(*requesters_total),
+            human_readable_size(freed)
+        );
+        assert!(*requesters_total >= freed);
+        *requesters_total -= freed;
+        self.cv.notify_all();
     }
 
     /// Drop a memory consumer and reclaim the memory
@@ -378,6 +403,8 @@ impl MemoryManager {
                 let mut total = self.requesters_total.lock();
                 assert!(*total >= mem_used);
                 *total -= mem_used;
+                self.cv.notify_all();
+                return;
             }
         }
         self.shrink_tracker_usage(mem_used);
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs
index 67a6e5fec..69779ac90 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -138,7 +138,7 @@ impl ExternalSorter {
                     &self.expr,
                     tracking_metrics,
                 )?;
-                let prev_used = self.metrics.mem_used().set(0);
+                let prev_used = self.free_all_memory();
                 streams.push(SortedStream::new(in_mem_stream, prev_used));
             }
 
@@ -169,13 +169,19 @@ impl ExternalSorter {
                 tracking_metrics,
             );
             // Report to the memory manager we are no longer using memory
-            self.metrics.mem_used().set(0);
+            self.free_all_memory();
             result
         } else {
             Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone())))
         }
     }
 
+    fn free_all_memory(&self) -> usize {
+        let used = self.metrics.mem_used().set(0);
+        self.shrink(used);
+        used
+    }
+
     fn used(&self) -> usize {
         self.metrics.mem_used().value()
     }
@@ -678,6 +684,15 @@ mod tests {
         assert_eq!(c7.value(0), 15);
         assert_eq!(c7.value(c7.len() - 1), 254,);
 
+        assert_eq!(
+            session_ctx
+                .runtime_env()
+                .memory_manager
+                .get_requester_total(),
+            0,
+            "The sort should have returned all memory used back to the memory manager"
+        );
+
         Ok(())
     }
 
@@ -755,6 +770,15 @@ mod tests {
         assert_eq!(c7.value(0), 15);
         assert_eq!(c7.value(c7.len() - 1), 254,);
 
+        assert_eq!(
+            session_ctx
+                .runtime_env()
+                .memory_manager
+                .get_requester_total(),
+            0,
+            "The sort should have returned all memory used back to the memory manager"
+        );
+
         Ok(())
     }
 
@@ -941,6 +965,15 @@ mod tests {
         drop(fut);
         assert_strong_count_converges_to_zero(refs).await;
 
+        assert_eq!(
+            session_ctx
+                .runtime_env()
+                .memory_manager
+                .get_requester_total(),
+            0,
+            "The sort should have returned all memory used back to the memory manager"
+        );
+
         Ok(())
     }
 }
diff --git a/datafusion/core/tests/order_spill_fuzz.rs b/datafusion/core/tests/order_spill_fuzz.rs
index c052382d5..4545cfa80 100644
--- a/datafusion/core/tests/order_spill_fuzz.rs
+++ b/datafusion/core/tests/order_spill_fuzz.rs
@@ -93,6 +93,14 @@ async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) {
             assert_eq!(sort.metrics().unwrap().spill_count().unwrap(), 0);
         }
 
+        assert_eq!(
+            session_ctx
+                .runtime_env()
+                .memory_manager
+                .get_requester_total(),
+            0,
+            "The sort should have returned all memory used back to the memory manager"
+        );
         assert_eq!(expected, actual, "failure in @ pool_size {}", pool_size);
     }
 }