You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2021/04/22 13:05:50 UTC
[arrow-datafusion] branch master updated: Use atomics for SQLMetric
implementation, remove unused names (#25)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 434fbf7 Use atomics for SQLMetric implementation, remove unused names (#25)
434fbf7 is described below
commit 434fbf7ba75ee91bfdb5b8270b602959694199c1
Author: Ruan Pearce-Authers <ru...@outlook.com>
AuthorDate: Thu Apr 22 14:05:43 2021 +0100
Use atomics for SQLMetric implementation, remove unused names (#25)
---
datafusion/src/physical_plan/hash_aggregate.rs | 16 ++++-------
datafusion/src/physical_plan/mod.rs | 40 ++++++++++++++++----------
datafusion/src/physical_plan/sort.rs | 32 ++++++++-------------
3 files changed, 43 insertions(+), 45 deletions(-)
diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs
index 2342650..fd20b5c 100644
--- a/datafusion/src/physical_plan/hash_aggregate.rs
+++ b/datafusion/src/physical_plan/hash_aggregate.rs
@@ -18,7 +18,7 @@
//! Defines the execution plan for the hash aggregate operation
use std::any::Any;
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
use std::task::{Context, Poll};
use ahash::RandomState;
@@ -95,7 +95,7 @@ pub struct HashAggregateExec {
/// to the partial aggregate
input_schema: SchemaRef,
/// Metric to track number of output rows
- output_rows: Arc<Mutex<SQLMetric>>,
+ output_rows: Arc<SQLMetric>,
}
fn create_schema(
@@ -144,7 +144,7 @@ impl HashAggregateExec {
let schema = Arc::new(schema);
- let output_rows = SQLMetric::counter("outputRows");
+ let output_rows = SQLMetric::counter();
Ok(HashAggregateExec {
mode,
@@ -253,10 +253,7 @@ impl ExecutionPlan for HashAggregateExec {
fn metrics(&self) -> HashMap<String, SQLMetric> {
let mut metrics = HashMap::new();
- metrics.insert(
- "outputRows".to_owned(),
- self.output_rows.lock().unwrap().clone(),
- );
+ metrics.insert("outputRows".to_owned(), (*self.output_rows).clone());
metrics
}
}
@@ -292,7 +289,7 @@ pin_project! {
#[pin]
output: futures::channel::oneshot::Receiver<ArrowResult<RecordBatch>>,
finished: bool,
- output_rows: Arc<Mutex<SQLMetric>>,
+ output_rows: Arc<SQLMetric>,
}
}
@@ -644,7 +641,7 @@ impl GroupedHashAggregateStream {
group_expr: Vec<Arc<dyn PhysicalExpr>>,
aggr_expr: Vec<Arc<dyn AggregateExpr>>,
input: SendableRecordBatchStream,
- output_rows: Arc<Mutex<SQLMetric>>,
+ output_rows: Arc<SQLMetric>,
) -> Self {
let (tx, rx) = futures::channel::oneshot::channel();
@@ -702,7 +699,6 @@ impl Stream for GroupedHashAggregateStream {
};
if let Ok(batch) = &result {
- let mut output_rows = output_rows.lock().unwrap();
output_rows.add(batch.num_rows())
}
diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs
index 5036dcb..80dfe6e 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -18,7 +18,8 @@
//! Traits for physical query plan, supporting parallel execution for partitioned relations.
use std::fmt::{Debug, Display};
-use std::sync::{Arc, Mutex};
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
use std::{any::Any, pin::Pin};
use crate::execution::context::ExecutionContextState;
@@ -58,44 +59,53 @@ pub enum MetricType {
/// SQL metric such as counter (number of input or output rows) or timing information about
/// a physical operator.
-#[derive(Debug, Clone)]
+#[derive(Debug)]
pub struct SQLMetric {
- /// Metric name
- name: String,
/// Metric value
- value: usize,
+ value: AtomicUsize,
/// Metric type
metric_type: MetricType,
}
+impl Clone for SQLMetric {
+ fn clone(&self) -> Self {
+ Self {
+ value: AtomicUsize::new(self.value.load(Ordering::Relaxed)),
+ metric_type: self.metric_type.clone(),
+ }
+ }
+}
+
impl SQLMetric {
+ // relaxed ordering for operations on `value` poses no issues
+ // we're purely using atomic ops with no associated memory ops
+
/// Create a new metric for tracking a counter
- pub fn counter(name: &str) -> Arc<Mutex<SQLMetric>> {
- Arc::new(Mutex::new(SQLMetric::new(name, MetricType::Counter)))
+ pub fn counter() -> Arc<SQLMetric> {
+ Arc::new(SQLMetric::new(MetricType::Counter))
}
/// Create a new metric for tracking time in nanoseconds
- pub fn time_nanos(name: &str) -> Arc<Mutex<SQLMetric>> {
- Arc::new(Mutex::new(SQLMetric::new(name, MetricType::TimeNanos)))
+ pub fn time_nanos() -> Arc<SQLMetric> {
+ Arc::new(SQLMetric::new(MetricType::TimeNanos))
}
/// Create a new SQLMetric
- pub fn new(name: &str, metric_type: MetricType) -> Self {
+ pub fn new(metric_type: MetricType) -> Self {
Self {
- name: name.to_owned(),
- value: 0,
+ value: AtomicUsize::new(0),
metric_type,
}
}
/// Add to the value
- pub fn add(&mut self, n: usize) {
- self.value += n;
+ pub fn add(&self, n: usize) {
+ self.value.fetch_add(n, Ordering::Relaxed);
}
/// Get the current value
pub fn value(&self) -> usize {
- self.value
+ self.value.load(Ordering::Relaxed)
}
}
diff --git a/datafusion/src/physical_plan/sort.rs b/datafusion/src/physical_plan/sort.rs
index 26855b3..010e406 100644
--- a/datafusion/src/physical_plan/sort.rs
+++ b/datafusion/src/physical_plan/sort.rs
@@ -19,7 +19,7 @@
use std::any::Any;
use std::pin::Pin;
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;
@@ -52,9 +52,9 @@ pub struct SortExec {
/// Sort expressions
expr: Vec<PhysicalSortExpr>,
/// Output rows
- output_rows: Arc<Mutex<SQLMetric>>,
+ output_rows: Arc<SQLMetric>,
/// Time to sort batches
- sort_time_nanos: Arc<Mutex<SQLMetric>>,
+ sort_time_nanos: Arc<SQLMetric>,
}
impl SortExec {
@@ -66,8 +66,8 @@ impl SortExec {
Ok(Self {
expr,
input,
- output_rows: SQLMetric::counter("outputRows"),
- sort_time_nanos: SQLMetric::time_nanos("sortTime"),
+ output_rows: SQLMetric::counter(),
+ sort_time_nanos: SQLMetric::time_nanos(),
})
}
@@ -147,14 +147,8 @@ impl ExecutionPlan for SortExec {
fn metrics(&self) -> HashMap<String, SQLMetric> {
let mut metrics = HashMap::new();
- metrics.insert(
- "outputRows".to_owned(),
- self.output_rows.lock().unwrap().clone(),
- );
- metrics.insert(
- "sortTime".to_owned(),
- self.sort_time_nanos.lock().unwrap().clone(),
- );
+ metrics.insert("outputRows".to_owned(), (*self.output_rows).clone());
+ metrics.insert("sortTime".to_owned(), (*self.sort_time_nanos).clone());
metrics
}
}
@@ -224,7 +218,7 @@ pin_project! {
output: futures::channel::oneshot::Receiver<ArrowResult<Option<RecordBatch>>>,
finished: bool,
schema: SchemaRef,
- output_rows: Arc<Mutex<SQLMetric>>,
+ output_rows: Arc<SQLMetric>,
}
}
@@ -232,8 +226,8 @@ impl SortStream {
fn new(
input: SendableRecordBatchStream,
expr: Vec<PhysicalSortExpr>,
- output_rows: Arc<Mutex<SQLMetric>>,
- sort_time: Arc<Mutex<SQLMetric>>,
+ output_rows: Arc<SQLMetric>,
+ sort_time: Arc<SQLMetric>,
) -> Self {
let (tx, rx) = futures::channel::oneshot::channel();
@@ -246,7 +240,6 @@ impl SortStream {
.and_then(move |batches| {
let now = Instant::now();
let result = sort_batches(&batches, &schema, &expr);
- let mut sort_time = sort_time.lock().unwrap();
sort_time.add(now.elapsed().as_nanos() as usize);
result
});
@@ -288,7 +281,6 @@ impl Stream for SortStream {
};
if let Some(Ok(batch)) = &result {
- let mut output_rows = output_rows.lock().unwrap();
output_rows.add(batch.num_rows());
}
@@ -431,8 +423,8 @@ mod tests {
assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type());
let result: Vec<RecordBatch> = collect(sort_exec.clone()).await?;
- assert!(sort_exec.metrics().get("sortTime").unwrap().value > 0);
- assert_eq!(sort_exec.metrics().get("outputRows").unwrap().value, 8);
+ assert!(sort_exec.metrics().get("sortTime").unwrap().value() > 0);
+ assert_eq!(sort_exec.metrics().get("outputRows").unwrap().value(), 8);
assert_eq!(result.len(), 1);
let columns = result[0].columns();