You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/04 18:48:14 UTC
[arrow-datafusion] branch master updated: fix: incorrect memory usage track for sort (#2135)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 2a4a835bd fix: incorrect memory usage track for sort (#2135)
2a4a835bd is described below
commit 2a4a835bd727b00c58631a1c807bb598a0a12a93
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Tue Apr 5 02:48:09 2022 +0800
fix: incorrect memory usage track for sort (#2135)
* fix: incorrect memory usage track for sort
* tests
---
datafusion/core/src/execution/memory_manager.rs | 37 +++++++++++++++++++++----
datafusion/core/src/physical_plan/sorts/sort.rs | 37 +++++++++++++++++++++++--
datafusion/core/tests/order_spill_fuzz.rs | 8 ++++++
3 files changed, 75 insertions(+), 7 deletions(-)
diff --git a/datafusion/core/src/execution/memory_manager.rs b/datafusion/core/src/execution/memory_manager.rs
index e48585ea2..4e6a61d02 100644
--- a/datafusion/core/src/execution/memory_manager.rs
+++ b/datafusion/core/src/execution/memory_manager.rs
@@ -195,6 +195,12 @@ pub trait MemoryConsumer: Send + Sync {
Ok(())
}
+ /// Return `freed` memory to the memory manager,
+ /// may wake up other requesters waiting for their minimum memory quota.
+ fn shrink(&self, freed: usize) {
+ self.memory_manager().record_free(freed);
+ }
+
/// Spill in-memory buffers to disk, free memory, return the previous used
async fn spill(&self) -> Result<usize>;
@@ -303,7 +309,8 @@ impl MemoryManager {
));
}
- fn get_requester_total(&self) -> usize {
+ /// Return the total memory usage for all requesters
+ pub fn get_requester_total(&self) -> usize {
*self.requesters_total.lock()
}
@@ -342,8 +349,8 @@ impl MemoryManager {
// if we cannot acquire at lease 1/2n memory, just wait for others
// to spill instead spill self frequently with limited total mem
debug!(
- "Cannot acquire minimum amount of memory {} on memory manager {}, waiting for others to spill ...",
- human_readable_size(min_per_rqt), self);
+ "Cannot acquire a minimum amount of {} memory from the manager of total {}, waiting for others to spill ...",
+ human_readable_size(min_per_rqt), human_readable_size(self.pool_size));
let now = Instant::now();
self.cv.wait(&mut rqt_current_used);
let elapsed = now.elapsed();
@@ -361,12 +368,30 @@ impl MemoryManager {
granted
}
- fn record_free_then_acquire(&self, freed: usize, acquired: usize) -> usize {
+ fn record_free_then_acquire(&self, freed: usize, acquired: usize) {
let mut requesters_total = self.requesters_total.lock();
+ debug!(
+ "free_then_acquire: total {}, freed {}, acquired {}",
+ human_readable_size(*requesters_total),
+ human_readable_size(freed),
+ human_readable_size(acquired)
+ );
assert!(*requesters_total >= freed);
*requesters_total -= freed;
*requesters_total += acquired;
- self.cv.notify_all()
+ self.cv.notify_all();
+ }
+
+ fn record_free(&self, freed: usize) {
+ let mut requesters_total = self.requesters_total.lock();
+ debug!(
+ "free: total {}, freed {}",
+ human_readable_size(*requesters_total),
+ human_readable_size(freed)
+ );
+ assert!(*requesters_total >= freed);
+ *requesters_total -= freed;
+ self.cv.notify_all();
}
/// Drop a memory consumer and reclaim the memory
@@ -378,6 +403,8 @@ impl MemoryManager {
let mut total = self.requesters_total.lock();
assert!(*total >= mem_used);
*total -= mem_used;
+ self.cv.notify_all();
+ return;
}
}
self.shrink_tracker_usage(mem_used);
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs
index 67a6e5fec..69779ac90 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -138,7 +138,7 @@ impl ExternalSorter {
&self.expr,
tracking_metrics,
)?;
- let prev_used = self.metrics.mem_used().set(0);
+ let prev_used = self.free_all_memory();
streams.push(SortedStream::new(in_mem_stream, prev_used));
}
@@ -169,13 +169,19 @@ impl ExternalSorter {
tracking_metrics,
);
// Report to the memory manager we are no longer using memory
- self.metrics.mem_used().set(0);
+ self.free_all_memory();
result
} else {
Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone())))
}
}
+ fn free_all_memory(&self) -> usize {
+ let used = self.metrics.mem_used().set(0);
+ self.shrink(used);
+ used
+ }
+
fn used(&self) -> usize {
self.metrics.mem_used().value()
}
@@ -678,6 +684,15 @@ mod tests {
assert_eq!(c7.value(0), 15);
assert_eq!(c7.value(c7.len() - 1), 254,);
+ assert_eq!(
+ session_ctx
+ .runtime_env()
+ .memory_manager
+ .get_requester_total(),
+ 0,
+ "The sort should have returned all memory used back to the memory manager"
+ );
+
Ok(())
}
@@ -755,6 +770,15 @@ mod tests {
assert_eq!(c7.value(0), 15);
assert_eq!(c7.value(c7.len() - 1), 254,);
+ assert_eq!(
+ session_ctx
+ .runtime_env()
+ .memory_manager
+ .get_requester_total(),
+ 0,
+ "The sort should have returned all memory used back to the memory manager"
+ );
+
Ok(())
}
@@ -941,6 +965,15 @@ mod tests {
drop(fut);
assert_strong_count_converges_to_zero(refs).await;
+ assert_eq!(
+ session_ctx
+ .runtime_env()
+ .memory_manager
+ .get_requester_total(),
+ 0,
+ "The sort should have returned all memory used back to the memory manager"
+ );
+
Ok(())
}
}
diff --git a/datafusion/core/tests/order_spill_fuzz.rs b/datafusion/core/tests/order_spill_fuzz.rs
index c052382d5..4545cfa80 100644
--- a/datafusion/core/tests/order_spill_fuzz.rs
+++ b/datafusion/core/tests/order_spill_fuzz.rs
@@ -93,6 +93,14 @@ async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) {
assert_eq!(sort.metrics().unwrap().spill_count().unwrap(), 0);
}
+ assert_eq!(
+ session_ctx
+ .runtime_env()
+ .memory_manager
+ .get_requester_total(),
+ 0,
+ "The sort should have returned all memory used back to the memory manager"
+ );
assert_eq!(expected, actual, "failure in @ pool_size {}", pool_size);
}
}