You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/01/28 10:40:48 UTC

[GitHub] [arrow-datafusion] yjshen opened a new pull request #1691: Add `MemTrackingMetrics` to ease memory tracking for non-limited memory consumers

yjshen opened a new pull request #1691:
URL: https://github.com/apache/arrow-datafusion/pull/1691


   # Which issue does this PR close?
   
   Closes #1569.
   
    # Rationale for this change
   
   The kinds of requesting memory consumers are pretty limited. As shown in #587, we only have 3 or 4 types of requesting memory consumers (join / sort / agg / repartition). All other consumers that take non-neglectable memory are considered tracking consumers.
   
   Tracking consumers always have a relatively fixed pattern for memory usage. They claim some memory, use it during execution, and free it when finished. The situation for growing or shrinking memory usage can be rare. 
   
   Considering the potentially large number of tracking consumers and the simple use case, we'd better have a simple method to achieve this kind of tracking; therefore, `MemTrackingMetrics` is proposed in this PR.
   
   # What changes are included in this PR?
   
   1. `MemTrackingMetrics` introduced, act similar to `BaselineMetrics`, report memory usage with `init_mem_used` and free memory when it's been dropped.
   2. MemoryManager no longer stores weak references for any consumers. Simplify the registering for memory consumers as well.
   3. Consumers push their memory usage to MemoryManager. No more pull from MemoryManagers for usage update.
   4. Use `MemTrackingMetrics` in SortPreservingMergeStream and SizedRecordBatchStream, simplify the tracking logic.
   5. Rename `AggregatedMetricsSet` to `CompositeMetricsSet`, fix start/end time aggregation.
   
   # Are there any user-facing changes?
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #1691: Add `MemTrackingMetrics` to ease memory tracking for non-limited memory consumers

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #1691:
URL: https://github.com/apache/arrow-datafusion/pull/1691#discussion_r794407926



##########
File path: datafusion/src/physical_plan/sorts/sort.rs
##########
@@ -139,41 +143,43 @@ impl ExternalSorter {
                 let stream = read_spill_as_stream(spill, self.schema.clone())?;
                 streams.push(SortedStream::new(stream, 0));
             }
-            let baseline_metrics = self.metrics.new_final_baseline(partition);
+            let tracking_metrics = self
+                .metrics_set
+                .new_final_tracking(partition, self.runtime.clone());
             Ok(Box::pin(SortPreservingMergeStream::new_from_streams(
                 streams,
                 self.schema.clone(),
                 &self.expr,
-                baseline_metrics,
-                partition,
+                tracking_metrics,
                 self.runtime.clone(),
             )))
         } else if in_mem_batches.len() > 0 {
-            let baseline_metrics = self.metrics.new_final_baseline(partition);
+            let tracking_metrics = self
+                .metrics_set
+                .new_final_tracking(partition, self.runtime.clone());
             let result = in_mem_partial_sort(
                 &mut *in_mem_batches,
                 self.schema.clone(),
                 &self.expr,
-                baseline_metrics,
+                tracking_metrics,
             );
-            self.inner_metrics.mem_used().set(0);
-            // TODO: the result size is not tracked

Review comment:
       This is the reason I wrote this PR in the first place 🤔.  A large sorted record batch whose memory is not tracked.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #1691: Add `MemTrackingMetrics` to ease memory tracking for non-limited memory consumers

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #1691:
URL: https://github.com/apache/arrow-datafusion/pull/1691#discussion_r795044951



##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -270,25 +270,35 @@ impl MemoryManager {
                     requesters: Arc::new(Mutex::new(HashSet::new())),
                     pool_size,
                     requesters_total: Arc::new(Mutex::new(0)),
-                    trackers_total: Arc::new(Mutex::new(0)),
+                    trackers_total: AtomicUsize::new(0),
                     cv: Condvar::new(),
                 })
             }
         }
     }
 
     fn get_tracker_total(&self) -> usize {
-        *self.trackers_total.lock().unwrap()
+        self.trackers_total.load(Ordering::SeqCst)
     }
 
     pub(crate) fn grow_tracker_usage(&self, delta: usize) {
-        *self.trackers_total.lock().unwrap() += delta;
+        self.trackers_total.fetch_add(delta, Ordering::SeqCst);
     }
 
     pub(crate) fn shrink_tracker_usage(&self, delta: usize) {
-        let mut total = self.trackers_total.lock().unwrap();
-        assert!(*total >= delta);
-        *total -= delta;
+        let update =
+            self.trackers_total
+                .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {

Review comment:
       I think fetch_update won't let infinite loop here. A simple case zero minus ten results in underflow:
   https://play.rust-lang.org/?version=stable&mode=release&edition=2021
   
   I agree the mutex version is easier to write and reason. I can revert the last commit if preferred.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #1691: Add `MemTrackingMetrics` to ease memory tracking for non-limited memory consumers

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #1691:
URL: https://github.com/apache/arrow-datafusion/pull/1691#discussion_r794984468



##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -245,10 +245,10 @@ The memory management architecture is the following:
 /// Manage memory usage during physical plan execution
 #[derive(Debug)]
 pub struct MemoryManager {
-    requesters: Arc<Mutex<HashMap<MemoryConsumerId, Weak<dyn MemoryConsumer>>>>,
-    trackers: Arc<Mutex<HashMap<MemoryConsumerId, Weak<dyn MemoryConsumer>>>>,
+    requesters: Arc<Mutex<HashSet<MemoryConsumerId>>>,
     pool_size: usize,
     requesters_total: Arc<Mutex<usize>>,

Review comment:
       this lock only guarantees the two operations updating requesters_total and calling cv.notify_all will be performed atomically, but it looks like this doesn't really buy us anything? The waiter on self.cv can wake up and get preempted right away by other threads that might update requesters_total. I am curious from your point of view what benefit this critical region provides.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb merged pull request #1691: Add `MemTrackingMetrics` to ease memory tracking for non-limited memory consumers

Posted by GitBox <gi...@apache.org>.
alamb merged pull request #1691:
URL: https://github.com/apache/arrow-datafusion/pull/1691


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #1691: Add `MemTrackingMetrics` to ease memory tracking for non-limited memory consumers

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #1691:
URL: https://github.com/apache/arrow-datafusion/pull/1691#discussion_r794407926



##########
File path: datafusion/src/physical_plan/sorts/sort.rs
##########
@@ -139,41 +143,43 @@ impl ExternalSorter {
                 let stream = read_spill_as_stream(spill, self.schema.clone())?;
                 streams.push(SortedStream::new(stream, 0));
             }
-            let baseline_metrics = self.metrics.new_final_baseline(partition);
+            let tracking_metrics = self
+                .metrics_set
+                .new_final_tracking(partition, self.runtime.clone());
             Ok(Box::pin(SortPreservingMergeStream::new_from_streams(
                 streams,
                 self.schema.clone(),
                 &self.expr,
-                baseline_metrics,
-                partition,
+                tracking_metrics,
                 self.runtime.clone(),
             )))
         } else if in_mem_batches.len() > 0 {
-            let baseline_metrics = self.metrics.new_final_baseline(partition);
+            let tracking_metrics = self
+                .metrics_set
+                .new_final_tracking(partition, self.runtime.clone());
             let result = in_mem_partial_sort(
                 &mut *in_mem_batches,
                 self.schema.clone(),
                 &self.expr,
-                baseline_metrics,
+                tracking_metrics,
             );
-            self.inner_metrics.mem_used().set(0);
-            // TODO: the result size is not tracked

Review comment:
       This is the reason I wrote this PR in the first place. 🤔




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #1691: Add `MemTrackingMetrics` to ease memory tracking for non-limited memory consumers

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1691:
URL: https://github.com/apache/arrow-datafusion/pull/1691#discussion_r795069168



##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -270,25 +270,35 @@ impl MemoryManager {
                     requesters: Arc::new(Mutex::new(HashSet::new())),
                     pool_size,
                     requesters_total: Arc::new(Mutex::new(0)),
-                    trackers_total: Arc::new(Mutex::new(0)),
+                    trackers_total: AtomicUsize::new(0),
                     cv: Condvar::new(),
                 })
             }
         }
     }
 
     fn get_tracker_total(&self) -> usize {
-        *self.trackers_total.lock().unwrap()
+        self.trackers_total.load(Ordering::SeqCst)
     }
 
     pub(crate) fn grow_tracker_usage(&self, delta: usize) {
-        *self.trackers_total.lock().unwrap() += delta;
+        self.trackers_total.fetch_add(delta, Ordering::SeqCst);
     }
 
     pub(crate) fn shrink_tracker_usage(&self, delta: usize) {
-        let mut total = self.trackers_total.lock().unwrap();
-        assert!(*total >= delta);
-        *total -= delta;
+        let update =
+            self.trackers_total
+                .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {

Review comment:
       This PR is ready to go I think -- we can always keep iterating in the next one. Sorry for the delay in merging @yjshen 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #1691: Add `MemTrackingMetrics` to ease memory tracking for non-limited memory consumers

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #1691:
URL: https://github.com/apache/arrow-datafusion/pull/1691#issuecomment-1024846856


   I've changed `trackers_total` to use `AtomicUsize`, PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #1691: Add `MemTrackingMetrics` to ease memory tracking for non-limited memory consumers

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #1691:
URL: https://github.com/apache/arrow-datafusion/pull/1691#discussion_r794977990



##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -245,10 +245,10 @@ The memory management architecture is the following:
 /// Manage memory usage during physical plan execution
 #[derive(Debug)]
 pub struct MemoryManager {
-    requesters: Arc<Mutex<HashMap<MemoryConsumerId, Weak<dyn MemoryConsumer>>>>,
-    trackers: Arc<Mutex<HashMap<MemoryConsumerId, Weak<dyn MemoryConsumer>>>>,
+    requesters: Arc<Mutex<HashSet<MemoryConsumerId>>>,
     pool_size: usize,
     requesters_total: Arc<Mutex<usize>>,

Review comment:
       Currently, the requester_total is combined with the later `Condvar`, to stop late arrived requesters frequently spilling (since the earlier consumers may already occupy much memory). They wait for notification when holding less than 1/2n memory. Any suggestions on this?
   
   The code here would be much simplified when substituted Arc<Mutex<usize>> by AtomicUsize.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #1691: Add `MemTrackingMetrics` to ease memory tracking for non-limited memory consumers

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #1691:
URL: https://github.com/apache/arrow-datafusion/pull/1691#discussion_r795000268



##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -245,10 +245,10 @@ The memory management architecture is the following:
 /// Manage memory usage during physical plan execution
 #[derive(Debug)]
 pub struct MemoryManager {
-    requesters: Arc<Mutex<HashMap<MemoryConsumerId, Weak<dyn MemoryConsumer>>>>,
-    trackers: Arc<Mutex<HashMap<MemoryConsumerId, Weak<dyn MemoryConsumer>>>>,
+    requesters: Arc<Mutex<HashSet<MemoryConsumerId>>>,
     pool_size: usize,
     requesters_total: Arc<Mutex<usize>>,

Review comment:
       never mind, I was wrong, considering the cv will reacquire the lock on wake up, a mutex is needed if we need to make sure the woken up thread will not be operating with a different `requesters_total` value.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #1691: Add `MemTrackingMetrics` to ease memory tracking for non-limited memory consumers

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1691:
URL: https://github.com/apache/arrow-datafusion/pull/1691#discussion_r794735689



##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -267,41 +267,37 @@ impl MemoryManager {
                 );
 
                 Arc::new(Self {
-                    requesters: Arc::new(Mutex::new(HashMap::new())),
-                    trackers: Arc::new(Mutex::new(HashMap::new())),
+                    requesters: Arc::new(Mutex::new(HashSet::new())),
                     pool_size,
                     requesters_total: Arc::new(Mutex::new(0)),
+                    trackers_total: Arc::new(Mutex::new(0)),
                     cv: Condvar::new(),
                 })
             }
         }
     }
 
     fn get_tracker_total(&self) -> usize {
-        let trackers = self.trackers.lock().unwrap();
-        if trackers.len() > 0 {
-            trackers.values().fold(0usize, |acc, y| match y.upgrade() {
-                None => acc,
-                Some(t) => acc + t.mem_used(),
-            })
-        } else {
-            0
-        }
+        *self.trackers_total.lock().unwrap()

Review comment:
       well that sure looks nicer 👍 
   

##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -245,10 +245,10 @@ The memory management architecture is the following:
 /// Manage memory usage during physical plan execution
 #[derive(Debug)]
 pub struct MemoryManager {
-    requesters: Arc<Mutex<HashMap<MemoryConsumerId, Weak<dyn MemoryConsumer>>>>,
-    trackers: Arc<Mutex<HashMap<MemoryConsumerId, Weak<dyn MemoryConsumer>>>>,
+    requesters: Arc<Mutex<HashSet<MemoryConsumerId>>>,
     pool_size: usize,
     requesters_total: Arc<Mutex<usize>>,

Review comment:
       Maybe as a follow on PR this can be changed to be an `AtomicUsize` and avoid the mutex and I think the fetch and update code will be nicer. 
   
   I think that would be a nice to have - not required.

##########
File path: datafusion/src/physical_plan/metrics/composite.rs
##########
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metrics common for complex operators with multiple steps.
+
+use crate::execution::runtime_env::RuntimeEnv;
+use crate::physical_plan::metrics::tracker::MemTrackingMetrics;
+use crate::physical_plan::metrics::{
+    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricValue, MetricsSet, Time,
+    Timestamp,
+};
+use crate::physical_plan::Metric;
+use chrono::{TimeZone, Utc};
+use std::sync::Arc;
+use std::time::Duration;
+
+#[derive(Debug, Clone)]
+/// Collects all metrics during a complex operation, which is composed of multiple steps and
+/// each stage reports its statistics separately.
+/// Give sort as an example, when the dataset is more significant than available memory, it will report
+/// multiple in-mem sort metrics and final merge-sort  metrics from `SortPreservingMergeStream`.
+/// Therefore, We need a separation of metrics for which are final metrics (for output_rows accumulation),
+/// and which are intermediate metrics that we only account for elapsed_compute time.
+pub struct CompositeMetricsSet {

Review comment:
       this is a nicer name and a good description for `Aggregated` metrics 

##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -528,23 +520,29 @@ mod tests {
             .with_memory_manager(MemoryManagerConfig::try_new_limit(100, 1.0).unwrap());
         let runtime = Arc::new(RuntimeEnv::new(config).unwrap());
 
-        let tracker1 = Arc::new(DummyTracker::new(0, runtime.clone(), 5));
-        runtime.register_consumer(&(tracker1.clone() as Arc<dyn MemoryConsumer>));
+        DummyTracker::new(0, runtime.clone(), 5);
         assert_eq!(runtime.memory_manager.get_tracker_total(), 5);
 
-        let tracker2 = Arc::new(DummyTracker::new(0, runtime.clone(), 10));
-        runtime.register_consumer(&(tracker2.clone() as Arc<dyn MemoryConsumer>));
+        let tracker1 = DummyTracker::new(0, runtime.clone(), 10);
         assert_eq!(runtime.memory_manager.get_tracker_total(), 15);
 
-        let tracker3 = Arc::new(DummyTracker::new(0, runtime.clone(), 15));
-        runtime.register_consumer(&(tracker3.clone() as Arc<dyn MemoryConsumer>));
+        DummyTracker::new(0, runtime.clone(), 15);
         assert_eq!(runtime.memory_manager.get_tracker_total(), 30);
 
-        runtime.drop_consumer(tracker2.id());
+        runtime.drop_consumer(tracker1.id(), tracker1.mem_used);
+        assert_eq!(runtime.memory_manager.get_tracker_total(), 20);
+
+        // MemTrackingMetrics as an easy way to track memory
+        let ms = ExecutionPlanMetricsSet::new();
+        let tracking_metric = MemTrackingMetrics::new_with_rt(&ms, 0, runtime.clone());
+        tracking_metric.init_mem_used(15);

Review comment:
       👌  --very nice

##########
File path: datafusion/src/physical_plan/metrics/tracker.rs
##########
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metrics with memory usage tracking capability
+
+use crate::execution::runtime_env::RuntimeEnv;
+use crate::execution::MemoryConsumerId;
+use crate::physical_plan::metrics::{
+    BaselineMetrics, Count, ExecutionPlanMetricsSet, Time,
+};
+use std::sync::Arc;
+use std::task::Poll;
+
+use arrow::{error::ArrowError, record_batch::RecordBatch};
+
+/// Simplified version of tracking memory consumer,
+/// see also: [`Tracking`](crate::execution::memory_manager::ConsumerType::Tracking)
+///
+/// You could use this to replace [BaselineMetrics], report the memory,

Review comment:
       I think it would make sense tocombine `MemTrackingMetrics` and `BaselineMetrics` -- for example make all  put the `runtime` and `id`  fields into `BaselineMetrics` 
   
   The rationale is that we would eventually like all `ExecutionPlan` operations to report their memory usage (as well as row count, and time spent) so using the existing `BaselineMetrics` would make it easy

##########
File path: datafusion/src/physical_plan/sorts/sort.rs
##########
@@ -139,41 +143,43 @@ impl ExternalSorter {
                 let stream = read_spill_as_stream(spill, self.schema.clone())?;
                 streams.push(SortedStream::new(stream, 0));
             }
-            let baseline_metrics = self.metrics.new_final_baseline(partition);
+            let tracking_metrics = self
+                .metrics_set
+                .new_final_tracking(partition, self.runtime.clone());
             Ok(Box::pin(SortPreservingMergeStream::new_from_streams(
                 streams,
                 self.schema.clone(),
                 &self.expr,
-                baseline_metrics,
-                partition,
+                tracking_metrics,
                 self.runtime.clone(),
             )))
         } else if in_mem_batches.len() > 0 {
-            let baseline_metrics = self.metrics.new_final_baseline(partition);
+            let tracking_metrics = self
+                .metrics_set
+                .new_final_tracking(partition, self.runtime.clone());
             let result = in_mem_partial_sort(
                 &mut *in_mem_batches,
                 self.schema.clone(),
                 &self.expr,
-                baseline_metrics,
+                tracking_metrics,
             );
-            self.inner_metrics.mem_used().set(0);
-            // TODO: the result size is not tracked
+            self.metrics.mem_used().set(0);

Review comment:
       ```suggestion
               // Report to the memory manager we are no longer using memory
               self.metrics.mem_used().set(0);
   ```

##########
File path: datafusion/src/physical_plan/metrics/tracker.rs
##########
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metrics with memory usage tracking capability
+
+use crate::execution::runtime_env::RuntimeEnv;
+use crate::execution::MemoryConsumerId;
+use crate::physical_plan::metrics::{
+    BaselineMetrics, Count, ExecutionPlanMetricsSet, Time,
+};
+use std::sync::Arc;
+use std::task::Poll;
+
+use arrow::{error::ArrowError, record_batch::RecordBatch};
+
+/// Simplified version of tracking memory consumer,
+/// see also: [`Tracking`](crate::execution::memory_manager::ConsumerType::Tracking)
+///
+/// You could use this to replace [BaselineMetrics], report the memory,
+/// and get the memory usage bookkeeping in the memory manager easily.
+#[derive(Debug)]
+pub struct MemTrackingMetrics {
+    id: MemoryConsumerId,
+    runtime: Option<Arc<RuntimeEnv>>,
+    metrics: BaselineMetrics,
+}
+
+/// Delegates most of the metrics functionalities to the inner BaselineMetrics,
+/// intercept memory metrics functionalities and do memory manager bookkeeping.
+impl MemTrackingMetrics {
+    /// Create metrics similar to [BaselineMetrics]
+    pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
+        let id = MemoryConsumerId::new(partition);
+        Self {
+            id,
+            runtime: None,
+            metrics: BaselineMetrics::new(metrics, partition),
+        }
+    }
+
+    /// Create memory tracking metrics with reference to runtime
+    pub fn new_with_rt(
+        metrics: &ExecutionPlanMetricsSet,
+        partition: usize,
+        runtime: Arc<RuntimeEnv>,
+    ) -> Self {
+        let id = MemoryConsumerId::new(partition);
+        Self {
+            id,
+            runtime: Some(runtime),
+            metrics: BaselineMetrics::new(metrics, partition),
+        }
+    }
+
+    /// return the metric for cpu time spend in this operator
+    pub fn elapsed_compute(&self) -> &Time {
+        self.metrics.elapsed_compute()
+    }
+
+    /// return the size for current memory usage
+    pub fn mem_used(&self) -> usize {
+        self.metrics.mem_used().value()
+    }
+
+    /// setup initial memory usage and register it with memory manager
+    pub fn init_mem_used(&self, size: usize) {
+        self.metrics.mem_used().set(size);
+        if let Some(rt) = self.runtime.as_ref() {
+            rt.memory_manager.grow_tracker_usage(size);
+        }
+    }
+
+    /// return the metric for the total number of output rows produced
+    pub fn output_rows(&self) -> &Count {
+        self.metrics.output_rows()
+    }
+
+    /// Records the fact that this operator's execution is complete
+    /// (recording the `end_time` metric).
+    ///
+    /// Note care should be taken to call `done()` manually if
+    /// `MemTrackingMetrics` is not `drop`ped immediately upon operator
+    /// completion, as async streams may not be dropped immediately
+    /// depending on the consumer.
+    pub fn done(&self) {
+        self.metrics.done()
+    }
+
+    /// Record that some number of rows have been produced as output
+    ///
+    /// See the [`RecordOutput`] for conveniently recording record
+    /// batch output for other thing
+    pub fn record_output(&self, num_rows: usize) {
+        self.metrics.record_output(num_rows)
+    }
+
+    /// Process a poll result of a stream producing output for an
+    /// operator, recording the output rows and stream done time and
+    /// returning the same poll result
+    pub fn record_poll(
+        &self,
+        poll: Poll<Option<Result<RecordBatch, ArrowError>>>,
+    ) -> Poll<Option<Result<RecordBatch, ArrowError>>> {
+        self.metrics.record_poll(poll)
+    }
+}
+
+impl Drop for MemTrackingMetrics {
+    fn drop(&mut self) {
+        self.metrics.try_done();
+        if self.mem_used() != 0 {
+            if let Some(rt) = self.runtime.as_ref() {
+                rt.drop_consumer(&self.id, self.mem_used());

Review comment:
       👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #1691: Add `MemTrackingMetrics` to ease memory tracking for non-limited memory consumers

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1691:
URL: https://github.com/apache/arrow-datafusion/pull/1691#discussion_r795040458



##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -270,25 +270,35 @@ impl MemoryManager {
                     requesters: Arc::new(Mutex::new(HashSet::new())),
                     pool_size,
                     requesters_total: Arc::new(Mutex::new(0)),
-                    trackers_total: Arc::new(Mutex::new(0)),
+                    trackers_total: AtomicUsize::new(0),
                     cv: Condvar::new(),
                 })
             }
         }
     }
 
     fn get_tracker_total(&self) -> usize {
-        *self.trackers_total.lock().unwrap()
+        self.trackers_total.load(Ordering::SeqCst)
     }
 
     pub(crate) fn grow_tracker_usage(&self, delta: usize) {
-        *self.trackers_total.lock().unwrap() += delta;
+        self.trackers_total.fetch_add(delta, Ordering::SeqCst);
     }
 
     pub(crate) fn shrink_tracker_usage(&self, delta: usize) {
-        let mut total = self.trackers_total.lock().unwrap();
-        assert!(*total >= delta);
-        *total -= delta;
+        let update =
+            self.trackers_total
+                .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {

Review comment:
       Would this potentially end up in an infinite loop if `x` is zero as the closure would always return None?
   
   After seeing this way, I think I agree that the original implementation using `Mutex` was better -- sorry about that @yjshen  I didn't realize the subtleties involved here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #1691: Add `MemTrackingMetrics` to ease memory tracking for non-limited memory consumers

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #1691:
URL: https://github.com/apache/arrow-datafusion/pull/1691#discussion_r794407926



##########
File path: datafusion/src/physical_plan/sorts/sort.rs
##########
@@ -139,41 +143,43 @@ impl ExternalSorter {
                 let stream = read_spill_as_stream(spill, self.schema.clone())?;
                 streams.push(SortedStream::new(stream, 0));
             }
-            let baseline_metrics = self.metrics.new_final_baseline(partition);
+            let tracking_metrics = self
+                .metrics_set
+                .new_final_tracking(partition, self.runtime.clone());
             Ok(Box::pin(SortPreservingMergeStream::new_from_streams(
                 streams,
                 self.schema.clone(),
                 &self.expr,
-                baseline_metrics,
-                partition,
+                tracking_metrics,
                 self.runtime.clone(),
             )))
         } else if in_mem_batches.len() > 0 {
-            let baseline_metrics = self.metrics.new_final_baseline(partition);
+            let tracking_metrics = self
+                .metrics_set
+                .new_final_tracking(partition, self.runtime.clone());
             let result = in_mem_partial_sort(
                 &mut *in_mem_batches,
                 self.schema.clone(),
                 &self.expr,
-                baseline_metrics,
+                tracking_metrics,
             );
-            self.inner_metrics.mem_used().set(0);
-            // TODO: the result size is not tracked

Review comment:
       This is the reason I wrote this PR in the first place. 🤔.  A sorted large record batch whose memory is not tracked.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #1691: Add `MemTrackingMetrics` to ease memory tracking for non-limited memory consumers

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #1691:
URL: https://github.com/apache/arrow-datafusion/pull/1691#discussion_r794407926



##########
File path: datafusion/src/physical_plan/sorts/sort.rs
##########
@@ -139,41 +143,43 @@ impl ExternalSorter {
                 let stream = read_spill_as_stream(spill, self.schema.clone())?;
                 streams.push(SortedStream::new(stream, 0));
             }
-            let baseline_metrics = self.metrics.new_final_baseline(partition);
+            let tracking_metrics = self
+                .metrics_set
+                .new_final_tracking(partition, self.runtime.clone());
             Ok(Box::pin(SortPreservingMergeStream::new_from_streams(
                 streams,
                 self.schema.clone(),
                 &self.expr,
-                baseline_metrics,
-                partition,
+                tracking_metrics,
                 self.runtime.clone(),
             )))
         } else if in_mem_batches.len() > 0 {
-            let baseline_metrics = self.metrics.new_final_baseline(partition);
+            let tracking_metrics = self
+                .metrics_set
+                .new_final_tracking(partition, self.runtime.clone());
             let result = in_mem_partial_sort(
                 &mut *in_mem_batches,
                 self.schema.clone(),
                 &self.expr,
-                baseline_metrics,
+                tracking_metrics,
             );
-            self.inner_metrics.mem_used().set(0);
-            // TODO: the result size is not tracked

Review comment:
       This is the reason I wrote this PR in the first place 🤔.  A sorted large record batch whose memory is not tracked.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org