You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2021/04/22 13:05:50 UTC

[arrow-datafusion] branch master updated: Use atomics for SQLMetric implementation, remove unused names (#25)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 434fbf7  Use atomics for SQLMetric implementation, remove unused names (#25)
434fbf7 is described below

commit 434fbf7ba75ee91bfdb5b8270b602959694199c1
Author: Ruan Pearce-Authers <ru...@outlook.com>
AuthorDate: Thu Apr 22 14:05:43 2021 +0100

    Use atomics for SQLMetric implementation, remove unused names (#25)
---
 datafusion/src/physical_plan/hash_aggregate.rs | 16 ++++-------
 datafusion/src/physical_plan/mod.rs            | 40 ++++++++++++++++----------
 datafusion/src/physical_plan/sort.rs           | 32 ++++++++-------------
 3 files changed, 43 insertions(+), 45 deletions(-)

diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs
index 2342650..fd20b5c 100644
--- a/datafusion/src/physical_plan/hash_aggregate.rs
+++ b/datafusion/src/physical_plan/hash_aggregate.rs
@@ -18,7 +18,7 @@
 //! Defines the execution plan for the hash aggregate operation
 
 use std::any::Any;
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
 use std::task::{Context, Poll};
 
 use ahash::RandomState;
@@ -95,7 +95,7 @@ pub struct HashAggregateExec {
     /// to the partial aggregate
     input_schema: SchemaRef,
     /// Metric to track number of output rows
-    output_rows: Arc<Mutex<SQLMetric>>,
+    output_rows: Arc<SQLMetric>,
 }
 
 fn create_schema(
@@ -144,7 +144,7 @@ impl HashAggregateExec {
 
         let schema = Arc::new(schema);
 
-        let output_rows = SQLMetric::counter("outputRows");
+        let output_rows = SQLMetric::counter();
 
         Ok(HashAggregateExec {
             mode,
@@ -253,10 +253,7 @@ impl ExecutionPlan for HashAggregateExec {
 
     fn metrics(&self) -> HashMap<String, SQLMetric> {
         let mut metrics = HashMap::new();
-        metrics.insert(
-            "outputRows".to_owned(),
-            self.output_rows.lock().unwrap().clone(),
-        );
+        metrics.insert("outputRows".to_owned(), (*self.output_rows).clone());
         metrics
     }
 }
@@ -292,7 +289,7 @@ pin_project! {
         #[pin]
         output: futures::channel::oneshot::Receiver<ArrowResult<RecordBatch>>,
         finished: bool,
-        output_rows: Arc<Mutex<SQLMetric>>,
+        output_rows: Arc<SQLMetric>,
     }
 }
 
@@ -644,7 +641,7 @@ impl GroupedHashAggregateStream {
         group_expr: Vec<Arc<dyn PhysicalExpr>>,
         aggr_expr: Vec<Arc<dyn AggregateExpr>>,
         input: SendableRecordBatchStream,
-        output_rows: Arc<Mutex<SQLMetric>>,
+        output_rows: Arc<SQLMetric>,
     ) -> Self {
         let (tx, rx) = futures::channel::oneshot::channel();
 
@@ -702,7 +699,6 @@ impl Stream for GroupedHashAggregateStream {
                 };
 
                 if let Ok(batch) = &result {
-                    let mut output_rows = output_rows.lock().unwrap();
                     output_rows.add(batch.num_rows())
                 }
 
diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs
index 5036dcb..80dfe6e 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -18,7 +18,8 @@
 //! Traits for physical query plan, supporting parallel execution for partitioned relations.
 
 use std::fmt::{Debug, Display};
-use std::sync::{Arc, Mutex};
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
 use std::{any::Any, pin::Pin};
 
 use crate::execution::context::ExecutionContextState;
@@ -58,44 +59,53 @@ pub enum MetricType {
 
 /// SQL metric such as counter (number of input or output rows) or timing information about
 /// a physical operator.
-#[derive(Debug, Clone)]
+#[derive(Debug)]
 pub struct SQLMetric {
-    /// Metric name
-    name: String,
     /// Metric value
-    value: usize,
+    value: AtomicUsize,
     /// Metric type
     metric_type: MetricType,
 }
 
+impl Clone for SQLMetric {
+    fn clone(&self) -> Self {
+        Self {
+            value: AtomicUsize::new(self.value.load(Ordering::Relaxed)),
+            metric_type: self.metric_type.clone(),
+        }
+    }
+}
+
 impl SQLMetric {
+    // relaxed ordering for operations on `value` poses no issues
+    // we're purely using atomic ops with no associated memory ops
+
     /// Create a new metric for tracking a counter
-    pub fn counter(name: &str) -> Arc<Mutex<SQLMetric>> {
-        Arc::new(Mutex::new(SQLMetric::new(name, MetricType::Counter)))
+    pub fn counter() -> Arc<SQLMetric> {
+        Arc::new(SQLMetric::new(MetricType::Counter))
     }
 
     /// Create a new metric for tracking time in nanoseconds
-    pub fn time_nanos(name: &str) -> Arc<Mutex<SQLMetric>> {
-        Arc::new(Mutex::new(SQLMetric::new(name, MetricType::TimeNanos)))
+    pub fn time_nanos() -> Arc<SQLMetric> {
+        Arc::new(SQLMetric::new(MetricType::TimeNanos))
     }
 
     /// Create a new SQLMetric
-    pub fn new(name: &str, metric_type: MetricType) -> Self {
+    pub fn new(metric_type: MetricType) -> Self {
         Self {
-            name: name.to_owned(),
-            value: 0,
+            value: AtomicUsize::new(0),
             metric_type,
         }
     }
 
     /// Add to the value
-    pub fn add(&mut self, n: usize) {
-        self.value += n;
+    pub fn add(&self, n: usize) {
+        self.value.fetch_add(n, Ordering::Relaxed);
     }
 
     /// Get the current value
     pub fn value(&self) -> usize {
-        self.value
+        self.value.load(Ordering::Relaxed)
     }
 }
 
diff --git a/datafusion/src/physical_plan/sort.rs b/datafusion/src/physical_plan/sort.rs
index 26855b3..010e406 100644
--- a/datafusion/src/physical_plan/sort.rs
+++ b/datafusion/src/physical_plan/sort.rs
@@ -19,7 +19,7 @@
 
 use std::any::Any;
 use std::pin::Pin;
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
 use std::task::{Context, Poll};
 use std::time::Instant;
 
@@ -52,9 +52,9 @@ pub struct SortExec {
     /// Sort expressions
     expr: Vec<PhysicalSortExpr>,
     /// Output rows
-    output_rows: Arc<Mutex<SQLMetric>>,
+    output_rows: Arc<SQLMetric>,
     /// Time to sort batches
-    sort_time_nanos: Arc<Mutex<SQLMetric>>,
+    sort_time_nanos: Arc<SQLMetric>,
 }
 
 impl SortExec {
@@ -66,8 +66,8 @@ impl SortExec {
         Ok(Self {
             expr,
             input,
-            output_rows: SQLMetric::counter("outputRows"),
-            sort_time_nanos: SQLMetric::time_nanos("sortTime"),
+            output_rows: SQLMetric::counter(),
+            sort_time_nanos: SQLMetric::time_nanos(),
         })
     }
 
@@ -147,14 +147,8 @@ impl ExecutionPlan for SortExec {
 
     fn metrics(&self) -> HashMap<String, SQLMetric> {
         let mut metrics = HashMap::new();
-        metrics.insert(
-            "outputRows".to_owned(),
-            self.output_rows.lock().unwrap().clone(),
-        );
-        metrics.insert(
-            "sortTime".to_owned(),
-            self.sort_time_nanos.lock().unwrap().clone(),
-        );
+        metrics.insert("outputRows".to_owned(), (*self.output_rows).clone());
+        metrics.insert("sortTime".to_owned(), (*self.sort_time_nanos).clone());
         metrics
     }
 }
@@ -224,7 +218,7 @@ pin_project! {
         output: futures::channel::oneshot::Receiver<ArrowResult<Option<RecordBatch>>>,
         finished: bool,
         schema: SchemaRef,
-        output_rows: Arc<Mutex<SQLMetric>>,
+        output_rows: Arc<SQLMetric>,
     }
 }
 
@@ -232,8 +226,8 @@ impl SortStream {
     fn new(
         input: SendableRecordBatchStream,
         expr: Vec<PhysicalSortExpr>,
-        output_rows: Arc<Mutex<SQLMetric>>,
-        sort_time: Arc<Mutex<SQLMetric>>,
+        output_rows: Arc<SQLMetric>,
+        sort_time: Arc<SQLMetric>,
     ) -> Self {
         let (tx, rx) = futures::channel::oneshot::channel();
 
@@ -246,7 +240,6 @@ impl SortStream {
                 .and_then(move |batches| {
                     let now = Instant::now();
                     let result = sort_batches(&batches, &schema, &expr);
-                    let mut sort_time = sort_time.lock().unwrap();
                     sort_time.add(now.elapsed().as_nanos() as usize);
                     result
                 });
@@ -288,7 +281,6 @@ impl Stream for SortStream {
                 };
 
                 if let Some(Ok(batch)) = &result {
-                    let mut output_rows = output_rows.lock().unwrap();
                     output_rows.add(batch.num_rows());
                 }
 
@@ -431,8 +423,8 @@ mod tests {
         assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type());
 
         let result: Vec<RecordBatch> = collect(sort_exec.clone()).await?;
-        assert!(sort_exec.metrics().get("sortTime").unwrap().value > 0);
-        assert_eq!(sort_exec.metrics().get("outputRows").unwrap().value, 8);
+        assert!(sort_exec.metrics().get("sortTime").unwrap().value() > 0);
+        assert_eq!(sort_exec.metrics().get("outputRows").unwrap().value(), 8);
         assert_eq!(result.len(), 1);
 
         let columns = result[0].columns();