You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/08/16 19:09:17 UTC

[GitHub] [arrow-datafusion] alamb opened a new pull request #901: Implement new metrics API

alamb opened a new pull request #901:
URL: https://github.com/apache/arrow-datafusion/pull/901


   # Which issue does this PR close?
   
   Closes https://github.com/apache/arrow-datafusion/issues/679
   
   Note: If people basically like this API I will go ahead and add unit tests for metrics.rs (e.g. for aggregate_by_partition)
   
    # Rationale for this change
   See the description on https://github.com/apache/arrow-datafusion/issues/679#issue-937015511 for the full rationale, but the TLDR version is:
   1. Better align `SQLMetric` data model to ease integration in other metric systems (e.g. prometheus, influxdb, etc)
   2. Ability to get per-partition metrics
   3. Ability to get current metric values *during* execution
   
   
   # What changes are included in this PR?
   1. Update the `SQLMetric` API to be in its own module, have labels, know about partitions, and allow for real time inspection
   2. Update uses of `SQLMetric` in DataFusion and Ballista to the new API 
   2. Functionality to aggregate (sum) metrics via predicate and via partition
   
   
   # Are there any user-facing changes?
   No
   
   The SQLMetric API is basically now totally different so any code that creates / uses `SQLMetrics` would have to be updated. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] andygrove commented on pull request #901: Implement new metrics API

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #901:
URL: https://github.com/apache/arrow-datafusion/pull/901#issuecomment-901362155


   > Given that snake case is the standard in Rust as well, I would probably be inclined to update the metric names to use snake case as well
   
   This makes sense. I didn't even think about the casing. I just spend too much time looking at Spark query plans so that influenced my initial work.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] tustvold commented on a change in pull request #901: Implement new metrics API

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #901:
URL: https://github.com/apache/arrow-datafusion/pull/901#discussion_r690595405



##########
File path: datafusion/src/physical_plan/metrics/wrappers.rs
##########
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//!  wrappers for `SQLMetrics` for more conveniently recording execution metrics
+
+use std::{sync::Arc, time::Instant};
+
+use super::SQLMetric;
+
+// pub trait MetricSource {
+//     /// Return the underlying metric
+//     pub fn metric(&self) -> Arc<SQLMetric>;
+// }
+
+/// a SQLMetric wrapper for a counter (number of input or output rows)
+///
+/// Note `clone` counters update the same underlying metrics
+#[derive(Debug, Clone)]
+pub struct Count {
+    inner: Arc<SQLMetric>,
+}
+
+impl Count {
+    /// create a new counter wrapper around this metric
+    pub fn new(inner: Arc<SQLMetric>) -> Self {
+        Self { inner }
+    }
+
+    /// Add `n` to the counter's value
+    pub fn add(&self, n: usize) {
+        self.inner.add(n)
+    }
+}
+
+/// a SQLMetric wrapper for CPU timing information
+#[derive(Debug, Clone)]
+pub struct Time {
+    inner: Arc<SQLMetric>,
+}
+
+impl Time {
+    /// Create a new [`Time`] wrapper suitable for recording elapsed
+    /// times for operations.
+    pub fn new(inner: Arc<SQLMetric>) -> Self {

Review comment:
       I wonder if these should be verifying the MetricKind of the SQLMetric? Or alternatively be private and have a member function on SQLMetric that does the verification




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #901: Implement new metrics API

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #901:
URL: https://github.com/apache/arrow-datafusion/pull/901#issuecomment-901359513


   @andygrove / @returnString  as you implemented the `SQLMetric`s initially and have worked with them, do you have any context or opinion on the use of `snake_case` vs `camelCase` names for the metrics? For example `numRows` vs `num_rows`? 
   
   From this PR's original description:
   
   > The current SQL counters use "camel case" for the counter names (e.g. `numRows`) rather than the Rust standard "snake case" (e.g. `num_rows`). I kept the same naming convention in this PR, but I wonder if we want to make them more Rust standard snake case given we are messing with them all anyways.
   
   @tustvold  notes that 
   
   > Edit: r.e. snake case vs camel case, FWIW most metrics systems I've interacted with don't support upper-case letters, nor hyphens, so snake case is pretty typical
   
   Given that snake case is the standard in Rust as well, I would probably be inclined to update the metric names to use snake case as well
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #901: Implement new metrics API / RFC

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #901:
URL: https://github.com/apache/arrow-datafusion/pull/901#issuecomment-902096652


   This PR has changed enough since initial feedback that I have opened a second one https://github.com/apache/arrow-datafusion/pull/908 with the updates. Closing this one. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #901: Implement new metrics API

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #901:
URL: https://github.com/apache/arrow-datafusion/pull/901#discussion_r689806700



##########
File path: datafusion/src/physical_plan/metrics.rs
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metrics for recording information about execution
+
+pub mod wrappers;
+
+use std::{
+    fmt::{Debug, Display},
+    sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc, Mutex,
+    },
+};
+
+use hashbrown::HashMap;
+
+use self::wrappers::{Count, Time};
+
+/// Structure for constructing metrics, counters, timers, etc
+pub struct MetricBuilder<'a> {
+    /// Location that the metric created by this builder will be added do
+    metrics: &'a SharedMetricsSet,
+
+    /// optional partition number
+    partition: Option<usize>,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+}
+
+impl<'a> MetricBuilder<'a> {
+    /// Create a new `MetricBuilder` that will register the result of `build()` with the `metrics`
+    pub fn new(metrics: &'a SharedMetricsSet) -> Self {
+        Self {
+            metrics,
+            partition: None,
+            labels: vec![],
+        }
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_label(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_new_label(
+        self,
+        name: impl Into<Arc<str>>,
+        value: impl Into<Arc<str>>,
+    ) -> Self {
+        self.with_label(Label::new(name.into(), value.into()))
+    }
+
+    /// Set the partition of the metric being constructed
+    pub fn with_partition(mut self, partition: usize) -> Self {
+        self.partition = Some(partition);
+        self
+    }
+
+    /// Consume self and create a metric of the specified kind
+    /// registered with the MetricsSet
+    pub fn build(self, kind: MetricKind) -> Arc<SQLMetric> {
+        let Self {
+            labels,
+            partition,
+            metrics,
+        } = self;
+        let metric = Arc::new(SQLMetric::new_with_labels(kind, partition, labels));
+        metrics.register(metric.clone());
+        metric
+    }
+
+    /// Consume self and create a new counter for recording output rows
+    pub fn output_rows(self, partition: usize) -> Count {
+        let metric = self.with_partition(partition).build(MetricKind::OutputRows);
+        Count::new(metric)
+    }
+
+    /// Consumes self and creates a new Countr for recording
+    /// some metric of an operators
+    pub fn counter(self, counter_name: impl Into<Arc<str>>, partition: usize) -> Count {
+        let metric = self
+            .with_partition(partition)
+            .build(MetricKind::Custom(counter_name.into()));
+        Count::new(metric)
+    }
+
+    /// Consumes self and creates a new Counter for recording
+    /// some metric of an overall operator (not per partition
+    pub fn global_counter(self, counter_name: impl Into<Arc<str>>) -> Count {
+        let metric = self.build(MetricKind::Custom(counter_name.into()));
+        Count::new(metric)
+    }
+
+    /// Consume self and create a new Timer for recording the overall cpu time
+    /// spent by an operator
+    pub fn cpu_time(self, partition: usize) -> Time {
+        let metric = self.with_partition(partition).build(MetricKind::CPUTime);
+        Time::new(metric)
+    }
+
+    /// Consumes self and creates a new Timer for recording some
+    /// subset of of an operators execution time
+    pub fn subset_time(self, subset_name: impl Into<Arc<str>>, partition: usize) -> Time {
+        let metric = self
+            .with_partition(partition)
+            .build(MetricKind::Custom(subset_name.into()));
+        Time::new(metric)
+    }
+}
+
+/// Something that tracks the metrics of an execution using an atomic
+/// usize
+#[derive(Debug)]
+pub struct SQLMetric {
+    /// value of the metric
+    value: AtomicUsize,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+
+    /// To which partition of an operators output did this metric
+    /// apply? If None means all partitions.
+    partition: Option<usize>,
+
+    /// The kind of metric (how to logically interpret the value)
+    kind: MetricKind,
+}
+
+impl Display for SQLMetric {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.kind)?;
+
+        let mut iter = self
+            .partition
+            .iter()
+            .map(|partition| {
+                Label::new(
+                    Arc::from("partition"),
+                    Arc::from(partition.to_string().as_str()),
+                )
+            })
+            .chain(self.labels().iter().cloned())
+            .peekable();
+
+        // print out the labels specially
+        if iter.peek().is_some() {
+            write!(f, "{{")?;
+
+            let mut is_first = true;
+            for i in iter {
+                if !is_first {
+                    write!(f, ", ")?;
+                } else {
+                    is_first = false;
+                }
+
+                write!(f, "{}", i)?;
+            }
+
+            write!(f, "}}")?;
+        }
+
+        // and now the value
+        let format_as_duration = match &self.kind {
+            MetricKind::OutputRows => false,
+            MetricKind::CPUTime => true,
+            MetricKind::Custom(name) => name.contains("Time") || name.contains("time"),
+        };
+
+        if format_as_duration {
+            let duration = std::time::Duration::from_nanos(self.value() as u64);
+            write!(f, "={:?}", duration)
+        } else {
+            write!(f, "={}", self.value())
+        }
+    }
+}
+
+impl SQLMetric {
+    /// Create a new SQLMetric
+    pub fn new(kind: MetricKind, partition: Option<usize>) -> Self {
+        Self {
+            value: 0.into(),
+            labels: vec![],
+            partition,
+            kind,
+        }
+    }
+
+    /// Add a new label to this metric
+    pub fn new_with_labels(
+        kind: MetricKind,
+        partition: Option<usize>,
+        labels: Vec<Label>,
+    ) -> Self {
+        Self {
+            value: 0.into(),
+            labels,
+            partition,
+            kind,
+        }
+    }
+
+    /// Add a new label to this metric
+    pub fn with(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add the standard name for output rows
+
+    /// Get the current value
+    pub fn value(&self) -> usize {
+        self.value.load(Ordering::Relaxed)
+    }
+
+    /// get the kind of this metric
+    pub fn kind(&self) -> &MetricKind {
+        &self.kind
+    }
+
+    /// Add `n` to the metric's value
+    pub fn add(&self, n: usize) {
+        // relaxed ordering for operations on `value` poses no issues
+        // we're purely using atomic ops with no associated memory ops
+        self.value.fetch_add(n, Ordering::Relaxed);
+    }
+
+    /// Set the metric's value to `n`
+    pub fn set(&self, n: usize) {
+        self.value.store(n, Ordering::Relaxed);
+    }
+
+    /// What labels are present for this metric?
+    fn labels(&self) -> &[Label] {
+        &self.labels
+    }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+/// How should the value of the metric be interpreted?
+pub enum MetricKind {
+    /// Number of output rows produced
+    OutputRows,
+    /// CPU time
+    CPUTime,
+    // TODO timestamp, etc
+    // https://github.com/apache/arrow-datafusion/issues/866
+    /// Arbitarary user defined type
+    Custom(Arc<str>),
+}
+
+impl Display for MetricKind {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.name())
+    }
+}
+
+impl MetricKind {
+    /// return a user displayable name of this kind of metric
+    pub fn name(&self) -> &str {
+        match self {
+            MetricKind::OutputRows => "outputRows",
+            MetricKind::Custom(name) => name,
+            MetricKind::CPUTime => "cpuTime",
+        }
+    }
+}
+
+/// A set of SQLMetrics for a particular operator
+#[derive(Default, Debug, Clone)]
+pub struct MetricsSet {
+    metrics: Vec<Arc<SQLMetric>>,
+}
+
+impl MetricsSet {
+    /// Create a new container of metrics
+    pub fn new() -> Self {
+        Default::default()
+    }
+
+    /// Add the specified metric
+    pub fn push(&mut self, metric: Arc<SQLMetric>) {
+        self.metrics.push(metric)
+    }
+
+    /// Add all [`SQLMetric`]s in this set to the specified array.
+    fn extend_other(&mut self, metrics: &mut Vec<Arc<SQLMetric>>) {
+        metrics.extend(self.metrics.iter().cloned())
+    }
+
+    /// convenience: return the number of rows produced, aggregated
+    /// across partitions or None if no metric is present
+    pub fn output_rows(&self) -> Option<usize> {
+        self.sum(|metric| matches!(metric.kind(), MetricKind::OutputRows))
+    }
+
+    /// convenience: return the amount of CPU time spent, aggregated
+    /// across partitions or None if no metric is present
+    pub fn cpu_time(&self) -> Option<usize> {
+        self.sum(|metric| matches!(metric.kind(), MetricKind::CPUTime))
+    }
+
+    /// Sums the values for metrics for which `f(metric)` returns
+    /// true, and returns the value. Returns None if no metrics match
+    /// the predicate.
+    pub fn sum<F>(&self, mut f: F) -> Option<usize>

Review comment:
       Here are some of the aggregation primitives (sum and group by partition). I feel this API may grow as we understand the usecases more




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #901: Implement new metrics API

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #901:
URL: https://github.com/apache/arrow-datafusion/pull/901#issuecomment-901369028


   > This makes sense. I didn't even think about the casing. I just spend too much time looking at Spark query plans so that influenced my initial work.
   
   Thanks @andygrove  -- I will then update this proposal to switch the names to snake_case then


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #901: Implement new metrics API

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #901:
URL: https://github.com/apache/arrow-datafusion/pull/901#discussion_r691527138



##########
File path: datafusion/src/physical_plan/metrics.rs
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metrics for recording information about execution
+
+pub mod wrappers;
+
+use std::{
+    fmt::{Debug, Display},
+    sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc, Mutex,
+    },
+};
+
+use hashbrown::HashMap;
+
+use self::wrappers::{Count, Time};
+
+/// Structure for constructing metrics, counters, timers, etc
+pub struct MetricBuilder<'a> {
+    /// Location that the metric created by this builder will be added do
+    metrics: &'a SharedMetricsSet,
+
+    /// optional partition number
+    partition: Option<usize>,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+}
+
+impl<'a> MetricBuilder<'a> {
+    /// Create a new `MetricBuilder` that will register the result of `build()` with the `metrics`
+    pub fn new(metrics: &'a SharedMetricsSet) -> Self {
+        Self {
+            metrics,
+            partition: None,
+            labels: vec![],
+        }
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_label(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_new_label(
+        self,
+        name: impl Into<Arc<str>>,
+        value: impl Into<Arc<str>>,
+    ) -> Self {
+        self.with_label(Label::new(name.into(), value.into()))
+    }
+
+    /// Set the partition of the metric being constructed
+    pub fn with_partition(mut self, partition: usize) -> Self {
+        self.partition = Some(partition);
+        self
+    }
+
+    /// Consume self and create a metric of the specified kind
+    /// registered with the MetricsSet
+    pub fn build(self, kind: MetricKind) -> Arc<SQLMetric> {
+        let Self {
+            labels,
+            partition,
+            metrics,
+        } = self;
+        let metric = Arc::new(SQLMetric::new_with_labels(kind, partition, labels));
+        metrics.register(metric.clone());
+        metric
+    }
+
+    /// Consume self and create a new counter for recording output rows
+    pub fn output_rows(self, partition: usize) -> Count {
+        let metric = self.with_partition(partition).build(MetricKind::OutputRows);
+        Count::new(metric)
+    }
+
+    /// Consumes self and creates a new Countr for recording
+    /// some metric of an operators
+    pub fn counter(self, counter_name: impl Into<Arc<str>>, partition: usize) -> Count {
+        let metric = self
+            .with_partition(partition)
+            .build(MetricKind::Custom(counter_name.into()));
+        Count::new(metric)
+    }
+
+    /// Consumes self and creates a new Counter for recording
+    /// some metric of an overall operator (not per partition
+    pub fn global_counter(self, counter_name: impl Into<Arc<str>>) -> Count {
+        let metric = self.build(MetricKind::Custom(counter_name.into()));
+        Count::new(metric)
+    }
+
+    /// Consume self and create a new Timer for recording the overall cpu time
+    /// spent by an operator
+    pub fn cpu_time(self, partition: usize) -> Time {
+        let metric = self.with_partition(partition).build(MetricKind::CPUTime);
+        Time::new(metric)
+    }
+
+    /// Consumes self and creates a new Timer for recording some
+    /// subset of of an operators execution time
+    pub fn subset_time(self, subset_name: impl Into<Arc<str>>, partition: usize) -> Time {
+        let metric = self
+            .with_partition(partition)
+            .build(MetricKind::Custom(subset_name.into()));
+        Time::new(metric)
+    }
+}
+
+/// Something that tracks the metrics of an execution using an atomic
+/// usize
+#[derive(Debug)]
+pub struct SQLMetric {
+    /// value of the metric
+    value: AtomicUsize,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+
+    /// To which partition of an operators output did this metric
+    /// apply? If None means all partitions.
+    partition: Option<usize>,
+
+    /// The kind of metric (how to logically interpret the value)
+    kind: MetricKind,
+}
+
+impl Display for SQLMetric {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.kind)?;
+
+        let mut iter = self
+            .partition
+            .iter()
+            .map(|partition| {
+                Label::new(
+                    Arc::from("partition"),
+                    Arc::from(partition.to_string().as_str()),
+                )
+            })
+            .chain(self.labels().iter().cloned())
+            .peekable();
+
+        // print out the labels specially
+        if iter.peek().is_some() {
+            write!(f, "{{")?;
+
+            let mut is_first = true;
+            for i in iter {
+                if !is_first {
+                    write!(f, ", ")?;
+                } else {
+                    is_first = false;
+                }
+
+                write!(f, "{}", i)?;
+            }
+
+            write!(f, "}}")?;
+        }
+
+        // and now the value
+        let format_as_duration = match &self.kind {
+            MetricKind::OutputRows => false,
+            MetricKind::CPUTime => true,
+            MetricKind::Custom(name) => name.contains("Time") || name.contains("time"),
+        };
+
+        if format_as_duration {
+            let duration = std::time::Duration::from_nanos(self.value() as u64);
+            write!(f, "={:?}", duration)
+        } else {
+            write!(f, "={}", self.value())
+        }
+    }
+}
+
+impl SQLMetric {
+    /// Create a new SQLMetric
+    pub fn new(kind: MetricKind, partition: Option<usize>) -> Self {
+        Self {
+            value: 0.into(),
+            labels: vec![],
+            partition,
+            kind,
+        }
+    }
+
+    /// Add a new label to this metric
+    pub fn new_with_labels(
+        kind: MetricKind,
+        partition: Option<usize>,
+        labels: Vec<Label>,
+    ) -> Self {
+        Self {
+            value: 0.into(),
+            labels,
+            partition,
+            kind,
+        }
+    }
+
+    /// Add a new label to this metric
+    pub fn with(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add the standard name for output rows
+
+    /// Get the current value
+    pub fn value(&self) -> usize {
+        self.value.load(Ordering::Relaxed)
+    }
+
+    /// get the kind of this metric
+    pub fn kind(&self) -> &MetricKind {
+        &self.kind
+    }
+
+    /// Add `n` to the metric's value
+    pub fn add(&self, n: usize) {
+        // relaxed ordering for operations on `value` poses no issues
+        // we're purely using atomic ops with no associated memory ops
+        self.value.fetch_add(n, Ordering::Relaxed);
+    }
+
+    /// Set the metric's value to `n`
+    pub fn set(&self, n: usize) {
+        self.value.store(n, Ordering::Relaxed);
+    }
+
+    /// What labels are present for this metric?
+    fn labels(&self) -> &[Label] {
+        &self.labels
+    }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+/// How should the value of the metric be interpreted?
+pub enum MetricKind {
+    /// Number of output rows produced
+    OutputRows,
+    /// CPU time
+    CPUTime,
+    // TODO timestamp, etc
+    // https://github.com/apache/arrow-datafusion/issues/866
+    /// Arbitarary user defined type
+    Custom(Arc<str>),
+}
+
+impl Display for MetricKind {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.name())
+    }
+}
+
+impl MetricKind {
+    /// return a user displayable name of this kind of metric
+    pub fn name(&self) -> &str {
+        match self {
+            MetricKind::OutputRows => "outputRows",
+            MetricKind::Custom(name) => name,
+            MetricKind::CPUTime => "cpuTime",
+        }
+    }
+}
+
+/// A set of SQLMetrics for a particular operator
+#[derive(Default, Debug, Clone)]
+pub struct MetricsSet {
+    metrics: Vec<Arc<SQLMetric>>,
+}
+
+impl MetricsSet {
+    /// Create a new container of metrics
+    pub fn new() -> Self {
+        Default::default()
+    }
+
+    /// Add the specified metric
+    pub fn push(&mut self, metric: Arc<SQLMetric>) {
+        self.metrics.push(metric)
+    }
+
+    /// Add all [`SQLMetric`]s in this set to the specified array.
+    fn extend_other(&mut self, metrics: &mut Vec<Arc<SQLMetric>>) {
+        metrics.extend(self.metrics.iter().cloned())
+    }
+
+    /// convenience: return the number of rows produced, aggregated
+    /// across partitions or None if no metric is present
+    pub fn output_rows(&self) -> Option<usize> {
+        self.sum(|metric| matches!(metric.kind(), MetricKind::OutputRows))
+    }
+
+    /// convenience: return the amount of CPU time spent, aggregated
+    /// across partitions or None if no metric is present
+    pub fn cpu_time(&self) -> Option<usize> {
+        self.sum(|metric| matches!(metric.kind(), MetricKind::CPUTime))
+    }
+
+    /// Sums the values for metrics for which `f(metric)` returns
+    /// true, and returns the value. Returns None if no metrics match
+    /// the predicate.
+    pub fn sum<F>(&self, mut f: F) -> Option<usize>
+    where
+        F: FnMut(&SQLMetric) -> bool,
+    {
+        let mut iter = self
+            .metrics
+            .iter()
+            .filter(|metric| f(metric.as_ref()))
+            .peekable();
+
+        if iter.peek().is_none() {
+            None
+        } else {
+            Some(iter.map(|metric| metric.value()).sum())
+        }
+    }
+
+    /// Returns returns a new derived `MetricsSet` where all metrics
+    /// that had partition=`Some(..)` have been aggregated
+    /// together. The resulting `MetricsSet` has all metrics with `Partition=None`
+    pub fn aggregate_by_partition(&self) -> Self {
+        let mut map = HashMap::new();
+
+        // There are all sorts of ways to make this more efficient
+        for metric in &self.metrics {
+            let key = (metric.kind.clone(), metric.labels.clone());
+            map.entry(key)
+                .and_modify(|accum: &mut SQLMetric| {
+                    accum.set(accum.value() + metric.value())
+                })
+                .or_insert_with(|| {
+                    // accumuate with no partition
+                    let partition = None;
+                    let accum = SQLMetric::new_with_labels(
+                        metric.kind().clone(),
+                        partition,
+                        metric.labels().to_vec(),
+                    );
+                    accum.set(metric.value());
+                    accum
+                });
+        }
+
+        let new_metrics = map
+            .into_iter()
+            .map(|(_k, v)| Arc::new(v))
+            .collect::<Vec<_>>();
+
+        Self {
+            metrics: new_metrics,
+        }
+    }
+}
+
+impl Display for MetricsSet {
+    /// format the MetricsSet as a single string
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let mut is_first = true;
+        for i in self.metrics.iter() {
+            if !is_first {
+                write!(f, ", ")?;
+            } else {
+                is_first = false;
+            }
+
+            write!(f, "{}", i)?;
+        }
+        Ok(())
+    }
+}
+
+/// A set of SQLMetrics that can be added to as partitions
+/// execute. Designed to be a convenience for operator implementation
+#[derive(Default, Debug)]
+pub struct SharedMetricsSet {

Review comment:
       Renamed to `ExecutionPlanMetricsSet` and added some clarifying doc comments

##########
File path: datafusion/src/physical_plan/metrics.rs
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metrics for recording information about execution
+
+pub mod wrappers;
+
+use std::{
+    fmt::{Debug, Display},
+    sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc, Mutex,
+    },
+};
+
+use hashbrown::HashMap;
+
+use self::wrappers::{Count, Time};
+
+/// Structure for constructing metrics, counters, timers, etc
+pub struct MetricBuilder<'a> {
+    /// Location that the metric created by this builder will be added do
+    metrics: &'a SharedMetricsSet,
+
+    /// optional partition number
+    partition: Option<usize>,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+}
+
+impl<'a> MetricBuilder<'a> {
+    /// Create a new `MetricBuilder` that will register the result of `build()` with the `metrics`
+    pub fn new(metrics: &'a SharedMetricsSet) -> Self {
+        Self {
+            metrics,
+            partition: None,
+            labels: vec![],
+        }
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_label(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_new_label(
+        self,
+        name: impl Into<Arc<str>>,
+        value: impl Into<Arc<str>>,
+    ) -> Self {
+        self.with_label(Label::new(name.into(), value.into()))
+    }
+
+    /// Set the partition of the metric being constructed
+    pub fn with_partition(mut self, partition: usize) -> Self {
+        self.partition = Some(partition);
+        self
+    }
+
+    /// Consume self and create a metric of the specified kind
+    /// registered with the MetricsSet
+    pub fn build(self, kind: MetricKind) -> Arc<SQLMetric> {
+        let Self {
+            labels,
+            partition,
+            metrics,
+        } = self;
+        let metric = Arc::new(SQLMetric::new_with_labels(kind, partition, labels));
+        metrics.register(metric.clone());
+        metric
+    }
+
+    /// Consume self and create a new counter for recording output rows
+    pub fn output_rows(self, partition: usize) -> Count {
+        let metric = self.with_partition(partition).build(MetricKind::OutputRows);
+        Count::new(metric)
+    }
+
+    /// Consumes self and creates a new Countr for recording
+    /// some metric of an operators
+    pub fn counter(self, counter_name: impl Into<Arc<str>>, partition: usize) -> Count {
+        let metric = self
+            .with_partition(partition)
+            .build(MetricKind::Custom(counter_name.into()));
+        Count::new(metric)
+    }
+
+    /// Consumes self and creates a new Counter for recording
+    /// some metric of an overall operator (not per partition
+    pub fn global_counter(self, counter_name: impl Into<Arc<str>>) -> Count {
+        let metric = self.build(MetricKind::Custom(counter_name.into()));
+        Count::new(metric)
+    }
+
+    /// Consume self and create a new Timer for recording the overall cpu time
+    /// spent by an operator
+    pub fn cpu_time(self, partition: usize) -> Time {
+        let metric = self.with_partition(partition).build(MetricKind::CPUTime);
+        Time::new(metric)
+    }
+
+    /// Consumes self and creates a new Timer for recording some
+    /// subset of of an operators execution time
+    pub fn subset_time(self, subset_name: impl Into<Arc<str>>, partition: usize) -> Time {
+        let metric = self
+            .with_partition(partition)
+            .build(MetricKind::Custom(subset_name.into()));
+        Time::new(metric)
+    }
+}
+
+/// Something that tracks the metrics of an execution using an atomic
+/// usize
+#[derive(Debug)]
+pub struct SQLMetric {
+    /// value of the metric
+    value: AtomicUsize,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+
+    /// To which partition of an operators output did this metric
+    /// apply? If None means all partitions.
+    partition: Option<usize>,
+
+    /// The kind of metric (how to logically interpret the value)
+    kind: MetricKind,
+}
+
+impl Display for SQLMetric {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.kind)?;
+
+        let mut iter = self
+            .partition
+            .iter()
+            .map(|partition| {
+                Label::new(
+                    Arc::from("partition"),
+                    Arc::from(partition.to_string().as_str()),
+                )
+            })
+            .chain(self.labels().iter().cloned())
+            .peekable();
+
+        // print out the labels specially
+        if iter.peek().is_some() {
+            write!(f, "{{")?;
+
+            let mut is_first = true;
+            for i in iter {
+                if !is_first {
+                    write!(f, ", ")?;
+                } else {
+                    is_first = false;
+                }
+
+                write!(f, "{}", i)?;
+            }
+
+            write!(f, "}}")?;
+        }
+
+        // and now the value
+        let format_as_duration = match &self.kind {
+            MetricKind::OutputRows => false,
+            MetricKind::CPUTime => true,
+            MetricKind::Custom(name) => name.contains("Time") || name.contains("time"),
+        };
+
+        if format_as_duration {
+            let duration = std::time::Duration::from_nanos(self.value() as u64);
+            write!(f, "={:?}", duration)
+        } else {
+            write!(f, "={}", self.value())
+        }
+    }
+}
+
+impl SQLMetric {
+    /// Create a new SQLMetric
+    pub fn new(kind: MetricKind, partition: Option<usize>) -> Self {
+        Self {
+            value: 0.into(),
+            labels: vec![],
+            partition,
+            kind,
+        }
+    }
+
+    /// Add a new label to this metric
+    pub fn new_with_labels(
+        kind: MetricKind,
+        partition: Option<usize>,
+        labels: Vec<Label>,
+    ) -> Self {
+        Self {
+            value: 0.into(),
+            labels,
+            partition,
+            kind,
+        }
+    }
+
+    /// Add a new label to this metric
+    pub fn with(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add the standard name for output rows
+
+    /// Get the current value
+    pub fn value(&self) -> usize {
+        self.value.load(Ordering::Relaxed)
+    }
+
+    /// get the kind of this metric
+    pub fn kind(&self) -> &MetricKind {
+        &self.kind
+    }
+
+    /// Add `n` to the metric's value
+    pub fn add(&self, n: usize) {
+        // relaxed ordering for operations on `value` poses no issues
+        // we're purely using atomic ops with no associated memory ops
+        self.value.fetch_add(n, Ordering::Relaxed);
+    }
+
+    /// Set the metric's value to `n`
+    pub fn set(&self, n: usize) {
+        self.value.store(n, Ordering::Relaxed);
+    }
+
+    /// What labels are present for this metric?
+    fn labels(&self) -> &[Label] {
+        &self.labels
+    }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+/// How should the value of the metric be interpreted?
+pub enum MetricKind {
+    /// Number of output rows produced
+    OutputRows,
+    /// CPU time
+    CPUTime,
+    // TODO timestamp, etc
+    // https://github.com/apache/arrow-datafusion/issues/866
+    /// Arbitarary user defined type
+    Custom(Arc<str>),
+}
+
+impl Display for MetricKind {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.name())
+    }
+}
+
+impl MetricKind {
+    /// return a user displayable name of this kind of metric
+    pub fn name(&self) -> &str {
+        match self {
+            MetricKind::OutputRows => "outputRows",
+            MetricKind::Custom(name) => name,
+            MetricKind::CPUTime => "cpuTime",
+        }
+    }
+}
+
+/// A set of SQLMetrics for a particular operator
+#[derive(Default, Debug, Clone)]
+pub struct MetricsSet {
+    metrics: Vec<Arc<SQLMetric>>,
+}
+
+impl MetricsSet {
+    /// Create a new container of metrics
+    pub fn new() -> Self {
+        Default::default()
+    }
+
+    /// Add the specified metric
+    pub fn push(&mut self, metric: Arc<SQLMetric>) {
+        self.metrics.push(metric)
+    }
+
+    /// Add all [`SQLMetric`]s in this set to the specified array.
+    fn extend_other(&mut self, metrics: &mut Vec<Arc<SQLMetric>>) {
+        metrics.extend(self.metrics.iter().cloned())
+    }
+
+    /// convenience: return the number of rows produced, aggregated
+    /// across partitions or None if no metric is present
+    pub fn output_rows(&self) -> Option<usize> {
+        self.sum(|metric| matches!(metric.kind(), MetricKind::OutputRows))
+    }
+
+    /// convenience: return the amount of CPU time spent, aggregated
+    /// across partitions or None if no metric is present
+    pub fn cpu_time(&self) -> Option<usize> {
+        self.sum(|metric| matches!(metric.kind(), MetricKind::CPUTime))
+    }
+
+    /// Sums the values for metrics for which `f(metric)` returns
+    /// true, and returns the value. Returns None if no metrics match
+    /// the predicate.
+    pub fn sum<F>(&self, mut f: F) -> Option<usize>
+    where
+        F: FnMut(&SQLMetric) -> bool,
+    {
+        let mut iter = self
+            .metrics
+            .iter()
+            .filter(|metric| f(metric.as_ref()))
+            .peekable();
+
+        if iter.peek().is_none() {
+            None
+        } else {
+            Some(iter.map(|metric| metric.value()).sum())
+        }
+    }
+
+    /// Returns returns a new derived `MetricsSet` where all metrics
+    /// that had partition=`Some(..)` have been aggregated
+    /// together. The resulting `MetricsSet` has all metrics with `Partition=None`
+    pub fn aggregate_by_partition(&self) -> Self {
+        let mut map = HashMap::new();
+
+        // There are all sorts of ways to make this more efficient
+        for metric in &self.metrics {
+            let key = (metric.kind.clone(), metric.labels.clone());
+            map.entry(key)
+                .and_modify(|accum: &mut SQLMetric| {
+                    accum.set(accum.value() + metric.value())
+                })
+                .or_insert_with(|| {
+                    // accumuate with no partition
+                    let partition = None;
+                    let accum = SQLMetric::new_with_labels(
+                        metric.kind().clone(),
+                        partition,
+                        metric.labels().to_vec(),
+                    );
+                    accum.set(metric.value());
+                    accum
+                });
+        }
+
+        let new_metrics = map
+            .into_iter()
+            .map(|(_k, v)| Arc::new(v))
+            .collect::<Vec<_>>();
+
+        Self {
+            metrics: new_metrics,
+        }
+    }
+}
+
+impl Display for MetricsSet {
+    /// format the MetricsSet as a single string
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let mut is_first = true;
+        for i in self.metrics.iter() {
+            if !is_first {
+                write!(f, ", ")?;
+            } else {
+                is_first = false;
+            }
+
+            write!(f, "{}", i)?;
+        }
+        Ok(())
+    }
+}
+
+/// A set of SQLMetrics that can be added to as partitions
+/// execute. Designed to be a convenience for operator implementation
+#[derive(Default, Debug)]
+pub struct SharedMetricsSet {
+    inner: Mutex<MetricsSet>,
+}
+
+impl SharedMetricsSet {
+    /// Create a new empty shared metrics set
+    pub fn new() -> Self {
+        Self {
+            inner: Mutex::new(MetricsSet::new()),
+        }
+    }
+
+    /// Add the specified metric
+    pub fn register(&self, metric: Arc<SQLMetric>) {
+        self.inner.lock().expect("not poisoned").push(metric)
+    }
+
+    /// Add all [`SQLMetric`]s for this `ExecutionPlan` to the
+    /// specified array.
+    pub fn extend_other(&self, metrics: &mut Vec<Arc<SQLMetric>>) {
+        self.inner
+            .lock()
+            .expect("not poisoned")
+            .extend_other(metrics)
+    }
+
+    /// Return a clone of the inner MetricsSet
+    pub fn clone_inner(&self) -> MetricsSet {
+        let guard = self.inner.lock().expect("not poisoned");
+        (*guard).clone()
+    }
+}
+
+/// name=value pairs identifiying a metric. This concept is called various things
+/// in various different systems:
+///
+/// "labels" in
+/// [prometheus](https://prometheus.io/docs/concepts/data_model/) and
+/// "tags" in
+/// [InfluxDB](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/)
+/// , "attributes" in [open
+/// telemetry](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/datamodel.md],
+/// etc.
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct Label {
+    name: Arc<str>,

Review comment:
       Changed. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] tustvold commented on a change in pull request #901: Implement new metrics API

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #901:
URL: https://github.com/apache/arrow-datafusion/pull/901#discussion_r690573748



##########
File path: ballista/rust/core/src/execution_plans/shuffle_writer.rs
##########
@@ -71,24 +74,30 @@ pub struct ShuffleWriterExec {
     work_dir: String,
     /// Optional shuffle output partitioning
     shuffle_output_partitioning: Option<Partitioning>,
-    /// Shuffle write metrics
-    metrics: ShuffleWriteMetrics,
+    /// Execution metrics
+    metrics: Arc<SharedMetricsSet>,

Review comment:
       Why is this an Arc?

##########
File path: datafusion/src/physical_plan/metrics.rs
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metrics for recording information about execution
+
+pub mod wrappers;
+
+use std::{
+    fmt::{Debug, Display},
+    sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc, Mutex,
+    },
+};
+
+use hashbrown::HashMap;
+
+use self::wrappers::{Count, Time};
+
+/// Structure for constructing metrics, counters, timers, etc
+pub struct MetricBuilder<'a> {
+    /// Location that the metric created by this builder will be added do
+    metrics: &'a SharedMetricsSet,
+
+    /// optional partition number
+    partition: Option<usize>,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+}
+
+impl<'a> MetricBuilder<'a> {
+    /// Create a new `MetricBuilder` that will register the result of `build()` with the `metrics`
+    pub fn new(metrics: &'a SharedMetricsSet) -> Self {
+        Self {
+            metrics,
+            partition: None,
+            labels: vec![],
+        }
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_label(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_new_label(
+        self,
+        name: impl Into<Arc<str>>,
+        value: impl Into<Arc<str>>,
+    ) -> Self {
+        self.with_label(Label::new(name.into(), value.into()))
+    }
+
+    /// Set the partition of the metric being constructed
+    pub fn with_partition(mut self, partition: usize) -> Self {
+        self.partition = Some(partition);
+        self
+    }
+
+    /// Consume self and create a metric of the specified kind
+    /// registered with the MetricsSet
+    pub fn build(self, kind: MetricKind) -> Arc<SQLMetric> {
+        let Self {
+            labels,
+            partition,
+            metrics,
+        } = self;
+        let metric = Arc::new(SQLMetric::new_with_labels(kind, partition, labels));
+        metrics.register(metric.clone());
+        metric
+    }
+
+    /// Consume self and create a new counter for recording output rows
+    pub fn output_rows(self, partition: usize) -> Count {
+        let metric = self.with_partition(partition).build(MetricKind::OutputRows);
+        Count::new(metric)
+    }
+
+    /// Consumes self and creates a new Countr for recording
+    /// some metric of an operators
+    pub fn counter(self, counter_name: impl Into<Arc<str>>, partition: usize) -> Count {
+        let metric = self
+            .with_partition(partition)
+            .build(MetricKind::Custom(counter_name.into()));
+        Count::new(metric)
+    }
+
+    /// Consumes self and creates a new Counter for recording
+    /// some metric of an overall operator (not per partition
+    pub fn global_counter(self, counter_name: impl Into<Arc<str>>) -> Count {
+        let metric = self.build(MetricKind::Custom(counter_name.into()));
+        Count::new(metric)
+    }
+
+    /// Consume self and create a new Timer for recording the overall cpu time
+    /// spent by an operator
+    pub fn cpu_time(self, partition: usize) -> Time {
+        let metric = self.with_partition(partition).build(MetricKind::CPUTime);
+        Time::new(metric)
+    }
+
+    /// Consumes self and creates a new Timer for recording some
+    /// subset of of an operators execution time
+    pub fn subset_time(self, subset_name: impl Into<Arc<str>>, partition: usize) -> Time {
+        let metric = self
+            .with_partition(partition)
+            .build(MetricKind::Custom(subset_name.into()));
+        Time::new(metric)
+    }
+}
+
+/// Something that tracks the metrics of an execution using an atomic
+/// usize
+#[derive(Debug)]
+pub struct SQLMetric {
+    /// value of the metric
+    value: AtomicUsize,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+
+    /// To which partition of an operators output did this metric
+    /// apply? If None means all partitions.
+    partition: Option<usize>,
+
+    /// The kind of metric (how to logically interpret the value)
+    kind: MetricKind,
+}
+
+impl Display for SQLMetric {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.kind)?;
+
+        let mut iter = self
+            .partition
+            .iter()
+            .map(|partition| {
+                Label::new(
+                    Arc::from("partition"),
+                    Arc::from(partition.to_string().as_str()),
+                )
+            })
+            .chain(self.labels().iter().cloned())
+            .peekable();
+
+        // print out the labels specially
+        if iter.peek().is_some() {
+            write!(f, "{{")?;
+
+            let mut is_first = true;
+            for i in iter {
+                if !is_first {
+                    write!(f, ", ")?;
+                } else {
+                    is_first = false;
+                }
+
+                write!(f, "{}", i)?;
+            }
+
+            write!(f, "}}")?;
+        }
+
+        // and now the value
+        let format_as_duration = match &self.kind {
+            MetricKind::OutputRows => false,
+            MetricKind::CPUTime => true,
+            MetricKind::Custom(name) => name.contains("Time") || name.contains("time"),
+        };
+
+        if format_as_duration {
+            let duration = std::time::Duration::from_nanos(self.value() as u64);
+            write!(f, "={:?}", duration)
+        } else {
+            write!(f, "={}", self.value())
+        }
+    }
+}
+
+impl SQLMetric {
+    /// Create a new SQLMetric
+    pub fn new(kind: MetricKind, partition: Option<usize>) -> Self {
+        Self {
+            value: 0.into(),
+            labels: vec![],
+            partition,
+            kind,
+        }
+    }
+
+    /// Add a new label to this metric
+    pub fn new_with_labels(
+        kind: MetricKind,
+        partition: Option<usize>,
+        labels: Vec<Label>,
+    ) -> Self {
+        Self {
+            value: 0.into(),
+            labels,
+            partition,
+            kind,
+        }
+    }
+
+    /// Add a new label to this metric
+    pub fn with(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add the standard name for output rows
+
+    /// Get the current value
+    pub fn value(&self) -> usize {
+        self.value.load(Ordering::Relaxed)
+    }
+
+    /// get the kind of this metric
+    pub fn kind(&self) -> &MetricKind {
+        &self.kind
+    }
+
+    /// Add `n` to the metric's value
+    pub fn add(&self, n: usize) {

Review comment:
       I like that metric recording doesn't involve any string manipulation :+1:

##########
File path: datafusion/src/physical_plan/metrics.rs
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metrics for recording information about execution
+
+pub mod wrappers;
+
+use std::{
+    fmt::{Debug, Display},
+    sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc, Mutex,
+    },
+};
+
+use hashbrown::HashMap;
+
+use self::wrappers::{Count, Time};
+
+/// Structure for constructing metrics, counters, timers, etc
+pub struct MetricBuilder<'a> {
+    /// Location that the metric created by this builder will be added do
+    metrics: &'a SharedMetricsSet,
+
+    /// optional partition number
+    partition: Option<usize>,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+}
+
+impl<'a> MetricBuilder<'a> {
+    /// Create a new `MetricBuilder` that will register the result of `build()` with the `metrics`
+    pub fn new(metrics: &'a SharedMetricsSet) -> Self {
+        Self {
+            metrics,
+            partition: None,
+            labels: vec![],
+        }
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_label(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_new_label(
+        self,
+        name: impl Into<Arc<str>>,
+        value: impl Into<Arc<str>>,
+    ) -> Self {
+        self.with_label(Label::new(name.into(), value.into()))
+    }
+
+    /// Set the partition of the metric being constructed
+    pub fn with_partition(mut self, partition: usize) -> Self {
+        self.partition = Some(partition);
+        self
+    }
+
+    /// Consume self and create a metric of the specified kind
+    /// registered with the MetricsSet
+    pub fn build(self, kind: MetricKind) -> Arc<SQLMetric> {
+        let Self {
+            labels,
+            partition,
+            metrics,
+        } = self;
+        let metric = Arc::new(SQLMetric::new_with_labels(kind, partition, labels));
+        metrics.register(metric.clone());
+        metric
+    }
+
+    /// Consume self and create a new counter for recording output rows
+    pub fn output_rows(self, partition: usize) -> Count {
+        let metric = self.with_partition(partition).build(MetricKind::OutputRows);
+        Count::new(metric)
+    }
+
+    /// Consumes self and creates a new Countr for recording
+    /// some metric of an operators
+    pub fn counter(self, counter_name: impl Into<Arc<str>>, partition: usize) -> Count {
+        let metric = self
+            .with_partition(partition)
+            .build(MetricKind::Custom(counter_name.into()));
+        Count::new(metric)
+    }
+
+    /// Consumes self and creates a new Counter for recording
+    /// some metric of an overall operator (not per partition
+    pub fn global_counter(self, counter_name: impl Into<Arc<str>>) -> Count {
+        let metric = self.build(MetricKind::Custom(counter_name.into()));
+        Count::new(metric)
+    }
+
+    /// Consume self and create a new Timer for recording the overall cpu time
+    /// spent by an operator
+    pub fn cpu_time(self, partition: usize) -> Time {
+        let metric = self.with_partition(partition).build(MetricKind::CPUTime);
+        Time::new(metric)
+    }
+
+    /// Consumes self and creates a new Timer for recording some
+    /// subset of of an operators execution time
+    pub fn subset_time(self, subset_name: impl Into<Arc<str>>, partition: usize) -> Time {
+        let metric = self
+            .with_partition(partition)
+            .build(MetricKind::Custom(subset_name.into()));
+        Time::new(metric)
+    }
+}
+
+/// Something that tracks the metrics of an execution using an atomic
+/// usize
+#[derive(Debug)]
+pub struct SQLMetric {
+    /// value of the metric
+    value: AtomicUsize,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+
+    /// To which partition of an operators output did this metric
+    /// apply? If None means all partitions.
+    partition: Option<usize>,
+
+    /// The kind of metric (how to logically interpret the value)
+    kind: MetricKind,
+}
+
+impl Display for SQLMetric {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.kind)?;
+
+        let mut iter = self
+            .partition
+            .iter()
+            .map(|partition| {
+                Label::new(
+                    Arc::from("partition"),
+                    Arc::from(partition.to_string().as_str()),
+                )
+            })
+            .chain(self.labels().iter().cloned())
+            .peekable();
+
+        // print out the labels specially
+        if iter.peek().is_some() {
+            write!(f, "{{")?;
+
+            let mut is_first = true;
+            for i in iter {
+                if !is_first {
+                    write!(f, ", ")?;
+                } else {
+                    is_first = false;
+                }
+
+                write!(f, "{}", i)?;
+            }
+
+            write!(f, "}}")?;
+        }
+
+        // and now the value
+        let format_as_duration = match &self.kind {
+            MetricKind::OutputRows => false,
+            MetricKind::CPUTime => true,
+            MetricKind::Custom(name) => name.contains("Time") || name.contains("time"),
+        };
+
+        if format_as_duration {
+            let duration = std::time::Duration::from_nanos(self.value() as u64);
+            write!(f, "={:?}", duration)
+        } else {
+            write!(f, "={}", self.value())
+        }
+    }
+}
+
+impl SQLMetric {
+    /// Create a new SQLMetric
+    pub fn new(kind: MetricKind, partition: Option<usize>) -> Self {
+        Self {
+            value: 0.into(),
+            labels: vec![],
+            partition,
+            kind,
+        }
+    }
+
+    /// Add a new label to this metric
+    pub fn new_with_labels(
+        kind: MetricKind,
+        partition: Option<usize>,
+        labels: Vec<Label>,
+    ) -> Self {
+        Self {
+            value: 0.into(),
+            labels,
+            partition,
+            kind,
+        }
+    }
+
+    /// Add a new label to this metric
+    pub fn with(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add the standard name for output rows
+
+    /// Get the current value
+    pub fn value(&self) -> usize {
+        self.value.load(Ordering::Relaxed)
+    }
+
+    /// get the kind of this metric
+    pub fn kind(&self) -> &MetricKind {
+        &self.kind
+    }
+
+    /// Add `n` to the metric's value
+    pub fn add(&self, n: usize) {
+        // relaxed ordering for operations on `value` poses no issues
+        // we're purely using atomic ops with no associated memory ops
+        self.value.fetch_add(n, Ordering::Relaxed);
+    }
+
+    /// Set the metric's value to `n`
+    pub fn set(&self, n: usize) {
+        self.value.store(n, Ordering::Relaxed);
+    }
+
+    /// What labels are present for this metric?
+    fn labels(&self) -> &[Label] {
+        &self.labels
+    }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+/// How should the value of the metric be interpreted?
+pub enum MetricKind {
+    /// Number of output rows produced
+    OutputRows,
+    /// CPU time
+    CPUTime,
+    // TODO timestamp, etc
+    // https://github.com/apache/arrow-datafusion/issues/866
+    /// Arbitarary user defined type
+    Custom(Arc<str>),

Review comment:
       I wonder if this should just be `&'static str` and MetricKind made Copy

##########
File path: datafusion/src/physical_plan/metrics.rs
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metrics for recording information about execution
+
+pub mod wrappers;
+
+use std::{
+    fmt::{Debug, Display},
+    sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc, Mutex,
+    },
+};
+
+use hashbrown::HashMap;
+
+use self::wrappers::{Count, Time};
+
+/// Structure for constructing metrics, counters, timers, etc
+pub struct MetricBuilder<'a> {
+    /// Location that the metric created by this builder will be added do
+    metrics: &'a SharedMetricsSet,
+
+    /// optional partition number
+    partition: Option<usize>,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+}
+
+impl<'a> MetricBuilder<'a> {
+    /// Create a new `MetricBuilder` that will register the result of `build()` with the `metrics`
+    pub fn new(metrics: &'a SharedMetricsSet) -> Self {
+        Self {
+            metrics,
+            partition: None,
+            labels: vec![],
+        }
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_label(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_new_label(
+        self,
+        name: impl Into<Arc<str>>,
+        value: impl Into<Arc<str>>,
+    ) -> Self {
+        self.with_label(Label::new(name.into(), value.into()))
+    }
+
+    /// Set the partition of the metric being constructed
+    pub fn with_partition(mut self, partition: usize) -> Self {
+        self.partition = Some(partition);
+        self
+    }
+
+    /// Consume self and create a metric of the specified kind
+    /// registered with the MetricsSet
+    pub fn build(self, kind: MetricKind) -> Arc<SQLMetric> {
+        let Self {
+            labels,
+            partition,
+            metrics,
+        } = self;
+        let metric = Arc::new(SQLMetric::new_with_labels(kind, partition, labels));
+        metrics.register(metric.clone());
+        metric
+    }
+
+    /// Consume self and create a new counter for recording output rows
+    pub fn output_rows(self, partition: usize) -> Count {
+        let metric = self.with_partition(partition).build(MetricKind::OutputRows);
+        Count::new(metric)
+    }
+
+    /// Consumes self and creates a new Countr for recording
+    /// some metric of an operators
+    pub fn counter(self, counter_name: impl Into<Arc<str>>, partition: usize) -> Count {
+        let metric = self
+            .with_partition(partition)
+            .build(MetricKind::Custom(counter_name.into()));
+        Count::new(metric)
+    }
+
+    /// Consumes self and creates a new Counter for recording
+    /// some metric of an overall operator (not per partition
+    pub fn global_counter(self, counter_name: impl Into<Arc<str>>) -> Count {
+        let metric = self.build(MetricKind::Custom(counter_name.into()));
+        Count::new(metric)
+    }
+
+    /// Consume self and create a new Timer for recording the overall cpu time
+    /// spent by an operator
+    pub fn cpu_time(self, partition: usize) -> Time {
+        let metric = self.with_partition(partition).build(MetricKind::CPUTime);
+        Time::new(metric)
+    }
+
+    /// Consumes self and creates a new Timer for recording some
+    /// subset of of an operators execution time
+    pub fn subset_time(self, subset_name: impl Into<Arc<str>>, partition: usize) -> Time {
+        let metric = self
+            .with_partition(partition)
+            .build(MetricKind::Custom(subset_name.into()));
+        Time::new(metric)
+    }
+}
+
+/// Something that tracks the metrics of an execution using an atomic
+/// usize
+#[derive(Debug)]
+pub struct SQLMetric {
+    /// value of the metric
+    value: AtomicUsize,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+
+    /// To which partition of an operators output did this metric
+    /// apply? If None means all partitions.
+    partition: Option<usize>,
+
+    /// The kind of metric (how to logically interpret the value)
+    kind: MetricKind,
+}
+
+impl Display for SQLMetric {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.kind)?;
+
+        let mut iter = self
+            .partition
+            .iter()
+            .map(|partition| {
+                Label::new(
+                    Arc::from("partition"),
+                    Arc::from(partition.to_string().as_str()),
+                )
+            })
+            .chain(self.labels().iter().cloned())
+            .peekable();
+
+        // print out the labels specially
+        if iter.peek().is_some() {
+            write!(f, "{{")?;
+
+            let mut is_first = true;
+            for i in iter {
+                if !is_first {
+                    write!(f, ", ")?;
+                } else {
+                    is_first = false;
+                }
+
+                write!(f, "{}", i)?;
+            }
+
+            write!(f, "}}")?;
+        }
+
+        // and now the value
+        let format_as_duration = match &self.kind {
+            MetricKind::OutputRows => false,
+            MetricKind::CPUTime => true,
+            MetricKind::Custom(name) => name.contains("Time") || name.contains("time"),
+        };
+
+        if format_as_duration {
+            let duration = std::time::Duration::from_nanos(self.value() as u64);
+            write!(f, "={:?}", duration)
+        } else {
+            write!(f, "={}", self.value())
+        }
+    }
+}
+
+impl SQLMetric {
+    /// Create a new SQLMetric
+    pub fn new(kind: MetricKind, partition: Option<usize>) -> Self {
+        Self {
+            value: 0.into(),
+            labels: vec![],
+            partition,
+            kind,
+        }
+    }
+
+    /// Add a new label to this metric
+    pub fn new_with_labels(
+        kind: MetricKind,
+        partition: Option<usize>,
+        labels: Vec<Label>,
+    ) -> Self {
+        Self {
+            value: 0.into(),
+            labels,
+            partition,
+            kind,
+        }
+    }
+
+    /// Add a new label to this metric
+    pub fn with(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add the standard name for output rows
+
+    /// Get the current value
+    pub fn value(&self) -> usize {
+        self.value.load(Ordering::Relaxed)
+    }
+
+    /// get the kind of this metric
+    pub fn kind(&self) -> &MetricKind {
+        &self.kind
+    }
+
+    /// Add `n` to the metric's value
+    pub fn add(&self, n: usize) {
+        // relaxed ordering for operations on `value` poses no issues
+        // we're purely using atomic ops with no associated memory ops
+        self.value.fetch_add(n, Ordering::Relaxed);
+    }
+
+    /// Set the metric's value to `n`
+    pub fn set(&self, n: usize) {
+        self.value.store(n, Ordering::Relaxed);
+    }
+
+    /// What labels are present for this metric?
+    fn labels(&self) -> &[Label] {
+        &self.labels
+    }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+/// How should the value of the metric be interpreted?
+pub enum MetricKind {
+    /// Number of output rows produced
+    OutputRows,
+    /// CPU time
+    CPUTime,
+    // TODO timestamp, etc
+    // https://github.com/apache/arrow-datafusion/issues/866
+    /// Arbitarary user defined type
+    Custom(Arc<str>),
+}
+
+impl Display for MetricKind {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.name())
+    }
+}
+
+impl MetricKind {
+    /// return a user displayable name of this kind of metric
+    pub fn name(&self) -> &str {
+        match self {
+            MetricKind::OutputRows => "outputRows",
+            MetricKind::Custom(name) => name,
+            MetricKind::CPUTime => "cpuTime",
+        }
+    }
+}
+
+/// A set of SQLMetrics for a particular operator
+#[derive(Default, Debug, Clone)]
+pub struct MetricsSet {
+    metrics: Vec<Arc<SQLMetric>>,
+}
+
+impl MetricsSet {
+    /// Create a new container of metrics
+    pub fn new() -> Self {
+        Default::default()
+    }
+
+    /// Add the specified metric
+    pub fn push(&mut self, metric: Arc<SQLMetric>) {
+        self.metrics.push(metric)
+    }
+
+    /// Add all [`SQLMetric`]s in this set to the specified array.
+    fn extend_other(&mut self, metrics: &mut Vec<Arc<SQLMetric>>) {
+        metrics.extend(self.metrics.iter().cloned())
+    }
+
+    /// convenience: return the number of rows produced, aggregated
+    /// across partitions or None if no metric is present
+    pub fn output_rows(&self) -> Option<usize> {
+        self.sum(|metric| matches!(metric.kind(), MetricKind::OutputRows))
+    }
+
+    /// convenience: return the amount of CPU time spent, aggregated
+    /// across partitions or None if no metric is present
+    pub fn cpu_time(&self) -> Option<usize> {
+        self.sum(|metric| matches!(metric.kind(), MetricKind::CPUTime))
+    }
+
+    /// Sums the values for metrics for which `f(metric)` returns
+    /// true, and returns the value. Returns None if no metrics match
+    /// the predicate.
+    pub fn sum<F>(&self, mut f: F) -> Option<usize>
+    where
+        F: FnMut(&SQLMetric) -> bool,
+    {
+        let mut iter = self
+            .metrics
+            .iter()
+            .filter(|metric| f(metric.as_ref()))
+            .peekable();
+
+        if iter.peek().is_none() {
+            None
+        } else {
+            Some(iter.map(|metric| metric.value()).sum())
+        }
+    }
+
+    /// Returns returns a new derived `MetricsSet` where all metrics
+    /// that had partition=`Some(..)` have been aggregated
+    /// together. The resulting `MetricsSet` has all metrics with `Partition=None`
+    pub fn aggregate_by_partition(&self) -> Self {
+        let mut map = HashMap::new();
+
+        // There are all sorts of ways to make this more efficient
+        for metric in &self.metrics {
+            let key = (metric.kind.clone(), metric.labels.clone());
+            map.entry(key)
+                .and_modify(|accum: &mut SQLMetric| {
+                    accum.set(accum.value() + metric.value())
+                })
+                .or_insert_with(|| {
+                    // accumuate with no partition

Review comment:
       ```suggestion
                       // accumulate with no partition
   ```

##########
File path: datafusion/src/physical_plan/metrics.rs
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metrics for recording information about execution
+
+pub mod wrappers;
+
+use std::{
+    fmt::{Debug, Display},
+    sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc, Mutex,
+    },
+};
+
+use hashbrown::HashMap;
+
+use self::wrappers::{Count, Time};
+
+/// Structure for constructing metrics, counters, timers, etc
+pub struct MetricBuilder<'a> {
+    /// Location that the metric created by this builder will be added do
+    metrics: &'a SharedMetricsSet,
+
+    /// optional partition number
+    partition: Option<usize>,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+}
+
+impl<'a> MetricBuilder<'a> {
+    /// Create a new `MetricBuilder` that will register the result of `build()` with the `metrics`
+    pub fn new(metrics: &'a SharedMetricsSet) -> Self {
+        Self {
+            metrics,
+            partition: None,
+            labels: vec![],
+        }
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_label(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_new_label(
+        self,
+        name: impl Into<Arc<str>>,
+        value: impl Into<Arc<str>>,
+    ) -> Self {
+        self.with_label(Label::new(name.into(), value.into()))
+    }
+
+    /// Set the partition of the metric being constructed
+    pub fn with_partition(mut self, partition: usize) -> Self {
+        self.partition = Some(partition);
+        self
+    }
+
+    /// Consume self and create a metric of the specified kind
+    /// registered with the MetricsSet
+    pub fn build(self, kind: MetricKind) -> Arc<SQLMetric> {
+        let Self {
+            labels,
+            partition,
+            metrics,
+        } = self;
+        let metric = Arc::new(SQLMetric::new_with_labels(kind, partition, labels));
+        metrics.register(metric.clone());
+        metric
+    }
+
+    /// Consume self and create a new counter for recording output rows
+    pub fn output_rows(self, partition: usize) -> Count {
+        let metric = self.with_partition(partition).build(MetricKind::OutputRows);
+        Count::new(metric)
+    }
+
+    /// Consumes self and creates a new Countr for recording
+    /// some metric of an operators
+    pub fn counter(self, counter_name: impl Into<Arc<str>>, partition: usize) -> Count {
+        let metric = self
+            .with_partition(partition)
+            .build(MetricKind::Custom(counter_name.into()));
+        Count::new(metric)
+    }
+
+    /// Consumes self and creates a new Counter for recording
+    /// some metric of an overall operator (not per partition
+    pub fn global_counter(self, counter_name: impl Into<Arc<str>>) -> Count {
+        let metric = self.build(MetricKind::Custom(counter_name.into()));
+        Count::new(metric)
+    }
+
+    /// Consume self and create a new Timer for recording the overall cpu time
+    /// spent by an operator
+    pub fn cpu_time(self, partition: usize) -> Time {
+        let metric = self.with_partition(partition).build(MetricKind::CPUTime);
+        Time::new(metric)
+    }
+
+    /// Consumes self and creates a new Timer for recording some
+    /// subset of of an operators execution time
+    pub fn subset_time(self, subset_name: impl Into<Arc<str>>, partition: usize) -> Time {
+        let metric = self
+            .with_partition(partition)
+            .build(MetricKind::Custom(subset_name.into()));
+        Time::new(metric)
+    }
+}
+
+/// Something that tracks the metrics of an execution using an atomic
+/// usize
+#[derive(Debug)]
+pub struct SQLMetric {
+    /// value of the metric
+    value: AtomicUsize,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+
+    /// To which partition of an operators output did this metric
+    /// apply? If None means all partitions.
+    partition: Option<usize>,
+
+    /// The kind of metric (how to logically interpret the value)
+    kind: MetricKind,
+}
+
+impl Display for SQLMetric {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.kind)?;
+
+        let mut iter = self
+            .partition
+            .iter()
+            .map(|partition| {
+                Label::new(
+                    Arc::from("partition"),
+                    Arc::from(partition.to_string().as_str()),
+                )
+            })
+            .chain(self.labels().iter().cloned())
+            .peekable();
+
+        // print out the labels specially
+        if iter.peek().is_some() {
+            write!(f, "{{")?;
+
+            let mut is_first = true;
+            for i in iter {
+                if !is_first {
+                    write!(f, ", ")?;
+                } else {
+                    is_first = false;
+                }
+
+                write!(f, "{}", i)?;
+            }
+
+            write!(f, "}}")?;
+        }
+
+        // and now the value
+        let format_as_duration = match &self.kind {
+            MetricKind::OutputRows => false,
+            MetricKind::CPUTime => true,
+            MetricKind::Custom(name) => name.contains("Time") || name.contains("time"),
+        };
+
+        if format_as_duration {
+            let duration = std::time::Duration::from_nanos(self.value() as u64);
+            write!(f, "={:?}", duration)
+        } else {
+            write!(f, "={}", self.value())
+        }
+    }
+}
+
+impl SQLMetric {
+    /// Create a new SQLMetric
+    pub fn new(kind: MetricKind, partition: Option<usize>) -> Self {
+        Self {
+            value: 0.into(),
+            labels: vec![],
+            partition,
+            kind,
+        }
+    }
+
+    /// Add a new label to this metric
+    pub fn new_with_labels(
+        kind: MetricKind,
+        partition: Option<usize>,
+        labels: Vec<Label>,
+    ) -> Self {
+        Self {
+            value: 0.into(),
+            labels,
+            partition,
+            kind,
+        }
+    }
+
+    /// Add a new label to this metric
+    pub fn with(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add the standard name for output rows
+
+    /// Get the current value
+    pub fn value(&self) -> usize {
+        self.value.load(Ordering::Relaxed)
+    }
+
+    /// get the kind of this metric
+    pub fn kind(&self) -> &MetricKind {
+        &self.kind
+    }
+
+    /// Add `n` to the metric's value
+    pub fn add(&self, n: usize) {
+        // relaxed ordering for operations on `value` poses no issues
+        // we're purely using atomic ops with no associated memory ops
+        self.value.fetch_add(n, Ordering::Relaxed);
+    }
+
+    /// Set the metric's value to `n`
+    pub fn set(&self, n: usize) {
+        self.value.store(n, Ordering::Relaxed);
+    }
+
+    /// What labels are present for this metric?
+    fn labels(&self) -> &[Label] {
+        &self.labels
+    }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+/// How should the value of the metric be interpreted?
+pub enum MetricKind {
+    /// Number of output rows produced
+    OutputRows,
+    /// CPU time
+    CPUTime,
+    // TODO timestamp, etc
+    // https://github.com/apache/arrow-datafusion/issues/866
+    /// Arbitarary user defined type
+    Custom(Arc<str>),
+}
+
+impl Display for MetricKind {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.name())
+    }
+}
+
+impl MetricKind {
+    /// return a user displayable name of this kind of metric
+    pub fn name(&self) -> &str {
+        match self {
+            MetricKind::OutputRows => "outputRows",
+            MetricKind::Custom(name) => name,
+            MetricKind::CPUTime => "cpuTime",
+        }
+    }
+}
+
+/// A set of SQLMetrics for a particular operator
+#[derive(Default, Debug, Clone)]
+pub struct MetricsSet {
+    metrics: Vec<Arc<SQLMetric>>,
+}
+
+impl MetricsSet {
+    /// Create a new container of metrics
+    pub fn new() -> Self {
+        Default::default()
+    }
+
+    /// Add the specified metric
+    pub fn push(&mut self, metric: Arc<SQLMetric>) {
+        self.metrics.push(metric)
+    }
+
+    /// Add all [`SQLMetric`]s in this set to the specified array.
+    fn extend_other(&mut self, metrics: &mut Vec<Arc<SQLMetric>>) {
+        metrics.extend(self.metrics.iter().cloned())
+    }
+
+    /// convenience: return the number of rows produced, aggregated
+    /// across partitions or None if no metric is present
+    pub fn output_rows(&self) -> Option<usize> {
+        self.sum(|metric| matches!(metric.kind(), MetricKind::OutputRows))
+    }
+
+    /// convenience: return the amount of CPU time spent, aggregated
+    /// across partitions or None if no metric is present
+    pub fn cpu_time(&self) -> Option<usize> {
+        self.sum(|metric| matches!(metric.kind(), MetricKind::CPUTime))
+    }
+
+    /// Sums the values for metrics for which `f(metric)` returns
+    /// true, and returns the value. Returns None if no metrics match
+    /// the predicate.
+    pub fn sum<F>(&self, mut f: F) -> Option<usize>
+    where
+        F: FnMut(&SQLMetric) -> bool,
+    {
+        let mut iter = self
+            .metrics
+            .iter()
+            .filter(|metric| f(metric.as_ref()))
+            .peekable();
+
+        if iter.peek().is_none() {
+            None
+        } else {
+            Some(iter.map(|metric| metric.value()).sum())
+        }
+    }
+
+    /// Returns returns a new derived `MetricsSet` where all metrics
+    /// that had partition=`Some(..)` have been aggregated
+    /// together. The resulting `MetricsSet` has all metrics with `Partition=None`
+    pub fn aggregate_by_partition(&self) -> Self {
+        let mut map = HashMap::new();
+
+        // There are all sorts of ways to make this more efficient
+        for metric in &self.metrics {
+            let key = (metric.kind.clone(), metric.labels.clone());
+            map.entry(key)
+                .and_modify(|accum: &mut SQLMetric| {
+                    accum.set(accum.value() + metric.value())
+                })
+                .or_insert_with(|| {
+                    // accumuate with no partition
+                    let partition = None;
+                    let accum = SQLMetric::new_with_labels(
+                        metric.kind().clone(),
+                        partition,
+                        metric.labels().to_vec(),
+                    );
+                    accum.set(metric.value());
+                    accum
+                });
+        }
+
+        let new_metrics = map
+            .into_iter()
+            .map(|(_k, v)| Arc::new(v))
+            .collect::<Vec<_>>();
+
+        Self {
+            metrics: new_metrics,
+        }
+    }
+}
+
+impl Display for MetricsSet {
+    /// format the MetricsSet as a single string
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let mut is_first = true;
+        for i in self.metrics.iter() {
+            if !is_first {
+                write!(f, ", ")?;
+            } else {
+                is_first = false;
+            }
+
+            write!(f, "{}", i)?;
+        }
+        Ok(())
+    }
+}
+
+/// A set of SQLMetrics that can be added to as partitions
+/// execute. Designed to be a convenience for operator implementation
+#[derive(Default, Debug)]
+pub struct SharedMetricsSet {
+    inner: Mutex<MetricsSet>,
+}
+
+impl SharedMetricsSet {
+    /// Create a new empty shared metrics set
+    pub fn new() -> Self {
+        Self {
+            inner: Mutex::new(MetricsSet::new()),
+        }
+    }
+
+    /// Add the specified metric
+    pub fn register(&self, metric: Arc<SQLMetric>) {
+        self.inner.lock().expect("not poisoned").push(metric)
+    }
+
+    /// Add all [`SQLMetric`]s for this `ExecutionPlan` to the
+    /// specified array.
+    pub fn extend_other(&self, metrics: &mut Vec<Arc<SQLMetric>>) {
+        self.inner
+            .lock()
+            .expect("not poisoned")
+            .extend_other(metrics)
+    }
+
+    /// Return a clone of the inner MetricsSet
+    pub fn clone_inner(&self) -> MetricsSet {
+        let guard = self.inner.lock().expect("not poisoned");
+        (*guard).clone()
+    }
+}
+
+/// name=value pairs identifiying a metric. This concept is called various things
+/// in various different systems:
+///
+/// "labels" in
+/// [prometheus](https://prometheus.io/docs/concepts/data_model/) and
+/// "tags" in
+/// [InfluxDB](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/)
+/// , "attributes" in [open
+/// telemetry](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/datamodel.md],
+/// etc.
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct Label {
+    name: Arc<str>,

Review comment:
       See above comment on `Cow<'static, str>`

##########
File path: datafusion/src/physical_plan/metrics.rs
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metrics for recording information about execution
+
+pub mod wrappers;
+
+use std::{
+    fmt::{Debug, Display},
+    sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc, Mutex,
+    },
+};
+
+use hashbrown::HashMap;
+
+use self::wrappers::{Count, Time};
+
+/// Structure for constructing metrics, counters, timers, etc
+pub struct MetricBuilder<'a> {
+    /// Location that the metric created by this builder will be added do
+    metrics: &'a SharedMetricsSet,
+
+    /// optional partition number
+    partition: Option<usize>,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+}
+
+impl<'a> MetricBuilder<'a> {
+    /// Create a new `MetricBuilder` that will register the result of `build()` with the `metrics`
+    pub fn new(metrics: &'a SharedMetricsSet) -> Self {
+        Self {
+            metrics,
+            partition: None,
+            labels: vec![],
+        }
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_label(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_new_label(
+        self,
+        name: impl Into<Arc<str>>,

Review comment:
       The use of `Arc<str>` for this, and labels, etc... is a little bit surprising to me. I'd expect these to in most cases be static strings, except in a couple of places where they might be unique dynamic strings (e.g. the output of format!). Perhaps `Cow<'static, str>` might be a better fit and would avoid allocations for static strings?

##########
File path: datafusion/src/physical_plan/metrics.rs
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metrics for recording information about execution
+
+pub mod wrappers;
+
+use std::{
+    fmt::{Debug, Display},
+    sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc, Mutex,
+    },
+};
+
+use hashbrown::HashMap;
+
+use self::wrappers::{Count, Time};
+
+/// Structure for constructing metrics, counters, timers, etc
+pub struct MetricBuilder<'a> {
+    /// Location that the metric created by this builder will be added do
+    metrics: &'a SharedMetricsSet,
+
+    /// optional partition number
+    partition: Option<usize>,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+}
+
+impl<'a> MetricBuilder<'a> {
+    /// Create a new `MetricBuilder` that will register the result of `build()` with the `metrics`
+    pub fn new(metrics: &'a SharedMetricsSet) -> Self {
+        Self {
+            metrics,
+            partition: None,
+            labels: vec![],
+        }
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_label(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_new_label(
+        self,
+        name: impl Into<Arc<str>>,
+        value: impl Into<Arc<str>>,
+    ) -> Self {
+        self.with_label(Label::new(name.into(), value.into()))
+    }
+
+    /// Set the partition of the metric being constructed
+    pub fn with_partition(mut self, partition: usize) -> Self {
+        self.partition = Some(partition);
+        self
+    }
+
+    /// Consume self and create a metric of the specified kind
+    /// registered with the MetricsSet
+    pub fn build(self, kind: MetricKind) -> Arc<SQLMetric> {
+        let Self {
+            labels,
+            partition,
+            metrics,
+        } = self;
+        let metric = Arc::new(SQLMetric::new_with_labels(kind, partition, labels));
+        metrics.register(metric.clone());
+        metric
+    }
+
+    /// Consume self and create a new counter for recording output rows
+    pub fn output_rows(self, partition: usize) -> Count {
+        let metric = self.with_partition(partition).build(MetricKind::OutputRows);
+        Count::new(metric)
+    }
+
+    /// Consumes self and creates a new Countr for recording
+    /// some metric of an operators
+    pub fn counter(self, counter_name: impl Into<Arc<str>>, partition: usize) -> Count {
+        let metric = self
+            .with_partition(partition)
+            .build(MetricKind::Custom(counter_name.into()));
+        Count::new(metric)
+    }
+
+    /// Consumes self and creates a new Counter for recording
+    /// some metric of an overall operator (not per partition
+    pub fn global_counter(self, counter_name: impl Into<Arc<str>>) -> Count {
+        let metric = self.build(MetricKind::Custom(counter_name.into()));
+        Count::new(metric)
+    }
+
+    /// Consume self and create a new Timer for recording the overall cpu time
+    /// spent by an operator
+    pub fn cpu_time(self, partition: usize) -> Time {
+        let metric = self.with_partition(partition).build(MetricKind::CPUTime);
+        Time::new(metric)
+    }
+
+    /// Consumes self and creates a new Timer for recording some
+    /// subset of of an operators execution time
+    pub fn subset_time(self, subset_name: impl Into<Arc<str>>, partition: usize) -> Time {
+        let metric = self
+            .with_partition(partition)
+            .build(MetricKind::Custom(subset_name.into()));
+        Time::new(metric)
+    }
+}
+
+/// Something that tracks the metrics of an execution using an atomic
+/// usize
+#[derive(Debug)]
+pub struct SQLMetric {
+    /// value of the metric
+    value: AtomicUsize,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+
+    /// To which partition of an operators output did this metric
+    /// apply? If None means all partitions.
+    partition: Option<usize>,
+
+    /// The kind of metric (how to logically interpret the value)
+    kind: MetricKind,
+}
+
+impl Display for SQLMetric {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.kind)?;
+
+        let mut iter = self
+            .partition
+            .iter()
+            .map(|partition| {
+                Label::new(
+                    Arc::from("partition"),
+                    Arc::from(partition.to_string().as_str()),
+                )
+            })
+            .chain(self.labels().iter().cloned())
+            .peekable();
+
+        // print out the labels specially
+        if iter.peek().is_some() {
+            write!(f, "{{")?;
+
+            let mut is_first = true;
+            for i in iter {
+                if !is_first {
+                    write!(f, ", ")?;
+                } else {
+                    is_first = false;
+                }
+
+                write!(f, "{}", i)?;
+            }
+
+            write!(f, "}}")?;
+        }
+
+        // and now the value
+        let format_as_duration = match &self.kind {
+            MetricKind::OutputRows => false,
+            MetricKind::CPUTime => true,
+            MetricKind::Custom(name) => name.contains("Time") || name.contains("time"),
+        };
+
+        if format_as_duration {
+            let duration = std::time::Duration::from_nanos(self.value() as u64);
+            write!(f, "={:?}", duration)
+        } else {
+            write!(f, "={}", self.value())
+        }
+    }
+}
+
+impl SQLMetric {
+    /// Create a new SQLMetric
+    pub fn new(kind: MetricKind, partition: Option<usize>) -> Self {
+        Self {
+            value: 0.into(),
+            labels: vec![],
+            partition,
+            kind,
+        }
+    }
+
+    /// Add a new label to this metric
+    pub fn new_with_labels(
+        kind: MetricKind,
+        partition: Option<usize>,
+        labels: Vec<Label>,
+    ) -> Self {
+        Self {
+            value: 0.into(),
+            labels,
+            partition,
+            kind,
+        }
+    }
+
+    /// Add a new label to this metric
+    pub fn with(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add the standard name for output rows
+
+    /// Get the current value
+    pub fn value(&self) -> usize {
+        self.value.load(Ordering::Relaxed)
+    }
+
+    /// get the kind of this metric
+    pub fn kind(&self) -> &MetricKind {
+        &self.kind
+    }
+
+    /// Add `n` to the metric's value
+    pub fn add(&self, n: usize) {
+        // relaxed ordering for operations on `value` poses no issues
+        // we're purely using atomic ops with no associated memory ops
+        self.value.fetch_add(n, Ordering::Relaxed);
+    }
+
+    /// Set the metric's value to `n`
+    pub fn set(&self, n: usize) {
+        self.value.store(n, Ordering::Relaxed);
+    }
+
+    /// What labels are present for this metric?
+    fn labels(&self) -> &[Label] {
+        &self.labels
+    }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+/// How should the value of the metric be interpreted?
+pub enum MetricKind {
+    /// Number of output rows produced
+    OutputRows,
+    /// CPU time
+    CPUTime,
+    // TODO timestamp, etc
+    // https://github.com/apache/arrow-datafusion/issues/866
+    /// Arbitarary user defined type
+    Custom(Arc<str>),
+}
+
+impl Display for MetricKind {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.name())
+    }
+}
+
+impl MetricKind {
+    /// return a user displayable name of this kind of metric
+    pub fn name(&self) -> &str {
+        match self {
+            MetricKind::OutputRows => "outputRows",
+            MetricKind::Custom(name) => name,
+            MetricKind::CPUTime => "cpuTime",
+        }
+    }
+}
+
+/// A set of SQLMetrics for a particular operator
+#[derive(Default, Debug, Clone)]
+pub struct MetricsSet {
+    metrics: Vec<Arc<SQLMetric>>,
+}
+
+impl MetricsSet {
+    /// Create a new container of metrics
+    pub fn new() -> Self {
+        Default::default()
+    }
+
+    /// Add the specified metric
+    pub fn push(&mut self, metric: Arc<SQLMetric>) {
+        self.metrics.push(metric)
+    }
+
+    /// Add all [`SQLMetric`]s in this set to the specified array.
+    fn extend_other(&mut self, metrics: &mut Vec<Arc<SQLMetric>>) {
+        metrics.extend(self.metrics.iter().cloned())
+    }
+
+    /// convenience: return the number of rows produced, aggregated
+    /// across partitions or None if no metric is present
+    pub fn output_rows(&self) -> Option<usize> {
+        self.sum(|metric| matches!(metric.kind(), MetricKind::OutputRows))
+    }
+
+    /// convenience: return the amount of CPU time spent, aggregated
+    /// across partitions or None if no metric is present
+    pub fn cpu_time(&self) -> Option<usize> {
+        self.sum(|metric| matches!(metric.kind(), MetricKind::CPUTime))
+    }
+
+    /// Sums the values for metrics for which `f(metric)` returns
+    /// true, and returns the value. Returns None if no metrics match
+    /// the predicate.
+    pub fn sum<F>(&self, mut f: F) -> Option<usize>
+    where
+        F: FnMut(&SQLMetric) -> bool,
+    {
+        let mut iter = self
+            .metrics
+            .iter()
+            .filter(|metric| f(metric.as_ref()))
+            .peekable();
+
+        if iter.peek().is_none() {
+            None
+        } else {
+            Some(iter.map(|metric| metric.value()).sum())
+        }
+    }
+
+    /// Returns returns a new derived `MetricsSet` where all metrics
+    /// that had partition=`Some(..)` have been aggregated
+    /// together. The resulting `MetricsSet` has all metrics with `Partition=None`
+    pub fn aggregate_by_partition(&self) -> Self {
+        let mut map = HashMap::new();
+
+        // There are all sorts of ways to make this more efficient
+        for metric in &self.metrics {
+            let key = (metric.kind.clone(), metric.labels.clone());
+            map.entry(key)
+                .and_modify(|accum: &mut SQLMetric| {
+                    accum.set(accum.value() + metric.value())
+                })
+                .or_insert_with(|| {
+                    // accumuate with no partition
+                    let partition = None;
+                    let accum = SQLMetric::new_with_labels(
+                        metric.kind().clone(),
+                        partition,
+                        metric.labels().to_vec(),
+                    );
+                    accum.set(metric.value());
+                    accum
+                });
+        }
+
+        let new_metrics = map
+            .into_iter()
+            .map(|(_k, v)| Arc::new(v))
+            .collect::<Vec<_>>();
+
+        Self {
+            metrics: new_metrics,
+        }
+    }
+}
+
+impl Display for MetricsSet {
+    /// format the MetricsSet as a single string
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let mut is_first = true;
+        for i in self.metrics.iter() {
+            if !is_first {
+                write!(f, ", ")?;
+            } else {
+                is_first = false;
+            }
+
+            write!(f, "{}", i)?;
+        }
+        Ok(())
+    }
+}
+
+/// A set of SQLMetrics that can be added to as partitions
+/// execute. Designed to be a convenience for operator implementation
+#[derive(Default, Debug)]
+pub struct SharedMetricsSet {

Review comment:
       The name of this confused me for a while. At least on my first pass Shared made me think shared between operators, when this clearly isn't the case. I don't really have a better suggestion for a name, but thought I'd mention it. Maybe OperatorMetricsSet or something, I don't know...

##########
File path: datafusion/src/physical_plan/metrics/wrappers.rs
##########
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//!  wrappers for `SQLMetrics` for more conveniently recording execution metrics
+
+use std::{sync::Arc, time::Instant};
+
+use super::SQLMetric;
+
+// pub trait MetricSource {
+//     /// Return the underlying metric
+//     pub fn metric(&self) -> Arc<SQLMetric>;
+// }
+
+/// a SQLMetric wrapper for a counter (number of input or output rows)
+///
+/// Note `clone` counters update the same underlying metrics
+#[derive(Debug, Clone)]
+pub struct Count {
+    inner: Arc<SQLMetric>,
+}
+
+impl Count {
+    /// create a new counter wrapper around this metric
+    pub fn new(inner: Arc<SQLMetric>) -> Self {
+        Self { inner }
+    }
+
+    /// Add `n` to the counter's value
+    pub fn add(&self, n: usize) {
+        self.inner.add(n)
+    }
+}
+
+/// a SQLMetric wrapper for CPU timing information
+#[derive(Debug, Clone)]
+pub struct Time {
+    inner: Arc<SQLMetric>,
+}
+
+impl Time {
+    /// Create a new [`Time`] wrapper suitable for recording elapsed
+    /// times for operations.
+    pub fn new(inner: Arc<SQLMetric>) -> Self {

Review comment:
       I wonder if these should be verifying the MetricKind of the SQLMetric? Or alternatively be private and have a member function on SQLMetric that does the verification




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #901: Implement new metrics API

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #901:
URL: https://github.com/apache/arrow-datafusion/pull/901#discussion_r691527618



##########
File path: datafusion/src/physical_plan/metrics.rs
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metrics for recording information about execution
+
+pub mod wrappers;
+
+use std::{
+    fmt::{Debug, Display},
+    sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc, Mutex,
+    },
+};
+
+use hashbrown::HashMap;
+
+use self::wrappers::{Count, Time};
+
+/// Structure for constructing metrics, counters, timers, etc
+pub struct MetricBuilder<'a> {
+    /// Location that the metric created by this builder will be added do
+    metrics: &'a SharedMetricsSet,
+
+    /// optional partition number
+    partition: Option<usize>,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+}
+
+impl<'a> MetricBuilder<'a> {
+    /// Create a new `MetricBuilder` that will register the result of `build()` with the `metrics`
+    pub fn new(metrics: &'a SharedMetricsSet) -> Self {
+        Self {
+            metrics,
+            partition: None,
+            labels: vec![],
+        }
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_label(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_new_label(
+        self,
+        name: impl Into<Arc<str>>,
+        value: impl Into<Arc<str>>,
+    ) -> Self {
+        self.with_label(Label::new(name.into(), value.into()))
+    }
+
+    /// Set the partition of the metric being constructed
+    pub fn with_partition(mut self, partition: usize) -> Self {
+        self.partition = Some(partition);
+        self
+    }
+
+    /// Consume self and create a metric of the specified kind
+    /// registered with the MetricsSet
+    pub fn build(self, kind: MetricKind) -> Arc<SQLMetric> {
+        let Self {
+            labels,
+            partition,
+            metrics,
+        } = self;
+        let metric = Arc::new(SQLMetric::new_with_labels(kind, partition, labels));
+        metrics.register(metric.clone());
+        metric
+    }
+
+    /// Consume self and create a new counter for recording output rows
+    pub fn output_rows(self, partition: usize) -> Count {
+        let metric = self.with_partition(partition).build(MetricKind::OutputRows);
+        Count::new(metric)
+    }
+
+    /// Consumes self and creates a new Countr for recording
+    /// some metric of an operators
+    pub fn counter(self, counter_name: impl Into<Arc<str>>, partition: usize) -> Count {
+        let metric = self
+            .with_partition(partition)
+            .build(MetricKind::Custom(counter_name.into()));
+        Count::new(metric)
+    }
+
+    /// Consumes self and creates a new Counter for recording
+    /// some metric of an overall operator (not per partition
+    pub fn global_counter(self, counter_name: impl Into<Arc<str>>) -> Count {
+        let metric = self.build(MetricKind::Custom(counter_name.into()));
+        Count::new(metric)
+    }
+
+    /// Consume self and create a new Timer for recording the overall cpu time
+    /// spent by an operator
+    pub fn cpu_time(self, partition: usize) -> Time {
+        let metric = self.with_partition(partition).build(MetricKind::CPUTime);
+        Time::new(metric)
+    }
+
+    /// Consumes self and creates a new Timer for recording some
+    /// subset of of an operators execution time
+    pub fn subset_time(self, subset_name: impl Into<Arc<str>>, partition: usize) -> Time {
+        let metric = self
+            .with_partition(partition)
+            .build(MetricKind::Custom(subset_name.into()));
+        Time::new(metric)
+    }
+}
+
+/// Something that tracks the metrics of an execution using an atomic
+/// usize
+#[derive(Debug)]
+pub struct SQLMetric {
+    /// value of the metric
+    value: AtomicUsize,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+
+    /// To which partition of an operators output did this metric
+    /// apply? If None means all partitions.
+    partition: Option<usize>,
+
+    /// The kind of metric (how to logically interpret the value)
+    kind: MetricKind,
+}
+
+impl Display for SQLMetric {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.kind)?;
+
+        let mut iter = self
+            .partition
+            .iter()
+            .map(|partition| {
+                Label::new(
+                    Arc::from("partition"),
+                    Arc::from(partition.to_string().as_str()),
+                )
+            })
+            .chain(self.labels().iter().cloned())
+            .peekable();
+
+        // print out the labels specially
+        if iter.peek().is_some() {
+            write!(f, "{{")?;
+
+            let mut is_first = true;
+            for i in iter {
+                if !is_first {
+                    write!(f, ", ")?;
+                } else {
+                    is_first = false;
+                }
+
+                write!(f, "{}", i)?;
+            }
+
+            write!(f, "}}")?;
+        }
+
+        // and now the value
+        let format_as_duration = match &self.kind {
+            MetricKind::OutputRows => false,
+            MetricKind::CPUTime => true,
+            MetricKind::Custom(name) => name.contains("Time") || name.contains("time"),
+        };
+
+        if format_as_duration {
+            let duration = std::time::Duration::from_nanos(self.value() as u64);
+            write!(f, "={:?}", duration)
+        } else {
+            write!(f, "={}", self.value())
+        }
+    }
+}
+
+impl SQLMetric {
+    /// Create a new SQLMetric
+    pub fn new(kind: MetricKind, partition: Option<usize>) -> Self {
+        Self {
+            value: 0.into(),
+            labels: vec![],
+            partition,
+            kind,
+        }
+    }
+
+    /// Add a new label to this metric
+    pub fn new_with_labels(
+        kind: MetricKind,
+        partition: Option<usize>,
+        labels: Vec<Label>,
+    ) -> Self {
+        Self {
+            value: 0.into(),
+            labels,
+            partition,
+            kind,
+        }
+    }
+
+    /// Add a new label to this metric
+    pub fn with(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add the standard name for output rows
+
+    /// Get the current value
+    pub fn value(&self) -> usize {
+        self.value.load(Ordering::Relaxed)
+    }
+
+    /// get the kind of this metric
+    pub fn kind(&self) -> &MetricKind {
+        &self.kind
+    }
+
+    /// Add `n` to the metric's value
+    pub fn add(&self, n: usize) {
+        // relaxed ordering for operations on `value` poses no issues
+        // we're purely using atomic ops with no associated memory ops
+        self.value.fetch_add(n, Ordering::Relaxed);
+    }
+
+    /// Set the metric's value to `n`
+    pub fn set(&self, n: usize) {
+        self.value.store(n, Ordering::Relaxed);
+    }
+
+    /// What labels are present for this metric?
+    fn labels(&self) -> &[Label] {
+        &self.labels
+    }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+/// How should the value of the metric be interpreted?
+pub enum MetricKind {
+    /// Number of output rows produced
+    OutputRows,
+    /// CPU time
+    CPUTime,
+    // TODO timestamp, etc
+    // https://github.com/apache/arrow-datafusion/issues/866
+    /// Arbitarary user defined type
+    Custom(Arc<str>),

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb closed pull request #901: Implement new metrics API / RFC

Posted by GitBox <gi...@apache.org>.
alamb closed pull request #901:
URL: https://github.com/apache/arrow-datafusion/pull/901


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] tustvold commented on a change in pull request #901: Implement new metrics API

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #901:
URL: https://github.com/apache/arrow-datafusion/pull/901#discussion_r690573748



##########
File path: ballista/rust/core/src/execution_plans/shuffle_writer.rs
##########
@@ -71,24 +74,30 @@ pub struct ShuffleWriterExec {
     work_dir: String,
     /// Optional shuffle output partitioning
     shuffle_output_partitioning: Option<Partitioning>,
-    /// Shuffle write metrics
-    metrics: ShuffleWriteMetrics,
+    /// Execution metrics
+    metrics: Arc<SharedMetricsSet>,

Review comment:
       Why is this an Arc?

##########
File path: datafusion/src/physical_plan/metrics.rs
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metrics for recording information about execution
+
+pub mod wrappers;
+
+use std::{
+    fmt::{Debug, Display},
+    sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc, Mutex,
+    },
+};
+
+use hashbrown::HashMap;
+
+use self::wrappers::{Count, Time};
+
+/// Structure for constructing metrics, counters, timers, etc
+pub struct MetricBuilder<'a> {
+    /// Location that the metric created by this builder will be added do
+    metrics: &'a SharedMetricsSet,
+
+    /// optional partition number
+    partition: Option<usize>,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+}
+
+impl<'a> MetricBuilder<'a> {
+    /// Create a new `MetricBuilder` that will register the result of `build()` with the `metrics`
+    pub fn new(metrics: &'a SharedMetricsSet) -> Self {
+        Self {
+            metrics,
+            partition: None,
+            labels: vec![],
+        }
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_label(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_new_label(
+        self,
+        name: impl Into<Arc<str>>,
+        value: impl Into<Arc<str>>,
+    ) -> Self {
+        self.with_label(Label::new(name.into(), value.into()))
+    }
+
+    /// Set the partition of the metric being constructed
+    pub fn with_partition(mut self, partition: usize) -> Self {
+        self.partition = Some(partition);
+        self
+    }
+
+    /// Consume self and create a metric of the specified kind
+    /// registered with the MetricsSet
+    pub fn build(self, kind: MetricKind) -> Arc<SQLMetric> {
+        let Self {
+            labels,
+            partition,
+            metrics,
+        } = self;
+        let metric = Arc::new(SQLMetric::new_with_labels(kind, partition, labels));
+        metrics.register(metric.clone());
+        metric
+    }
+
+    /// Consume self and create a new counter for recording output rows
+    pub fn output_rows(self, partition: usize) -> Count {
+        let metric = self.with_partition(partition).build(MetricKind::OutputRows);
+        Count::new(metric)
+    }
+
+    /// Consumes self and creates a new Countr for recording
+    /// some metric of an operators
+    pub fn counter(self, counter_name: impl Into<Arc<str>>, partition: usize) -> Count {
+        let metric = self
+            .with_partition(partition)
+            .build(MetricKind::Custom(counter_name.into()));
+        Count::new(metric)
+    }
+
+    /// Consumes self and creates a new Counter for recording
+    /// some metric of an overall operator (not per partition
+    pub fn global_counter(self, counter_name: impl Into<Arc<str>>) -> Count {
+        let metric = self.build(MetricKind::Custom(counter_name.into()));
+        Count::new(metric)
+    }
+
+    /// Consume self and create a new Timer for recording the overall cpu time
+    /// spent by an operator
+    pub fn cpu_time(self, partition: usize) -> Time {
+        let metric = self.with_partition(partition).build(MetricKind::CPUTime);
+        Time::new(metric)
+    }
+
+    /// Consumes self and creates a new Timer for recording some
+    /// subset of of an operators execution time
+    pub fn subset_time(self, subset_name: impl Into<Arc<str>>, partition: usize) -> Time {
+        let metric = self
+            .with_partition(partition)
+            .build(MetricKind::Custom(subset_name.into()));
+        Time::new(metric)
+    }
+}
+
+/// Something that tracks the metrics of an execution using an atomic
+/// usize
+#[derive(Debug)]
+pub struct SQLMetric {
+    /// value of the metric
+    value: AtomicUsize,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+
+    /// To which partition of an operators output did this metric
+    /// apply? If None means all partitions.
+    partition: Option<usize>,
+
+    /// The kind of metric (how to logically interpret the value)
+    kind: MetricKind,
+}
+
+impl Display for SQLMetric {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.kind)?;
+
+        let mut iter = self
+            .partition
+            .iter()
+            .map(|partition| {
+                Label::new(
+                    Arc::from("partition"),
+                    Arc::from(partition.to_string().as_str()),
+                )
+            })
+            .chain(self.labels().iter().cloned())
+            .peekable();
+
+        // print out the labels specially
+        if iter.peek().is_some() {
+            write!(f, "{{")?;
+
+            let mut is_first = true;
+            for i in iter {
+                if !is_first {
+                    write!(f, ", ")?;
+                } else {
+                    is_first = false;
+                }
+
+                write!(f, "{}", i)?;
+            }
+
+            write!(f, "}}")?;
+        }
+
+        // and now the value
+        let format_as_duration = match &self.kind {
+            MetricKind::OutputRows => false,
+            MetricKind::CPUTime => true,
+            MetricKind::Custom(name) => name.contains("Time") || name.contains("time"),
+        };
+
+        if format_as_duration {
+            let duration = std::time::Duration::from_nanos(self.value() as u64);
+            write!(f, "={:?}", duration)
+        } else {
+            write!(f, "={}", self.value())
+        }
+    }
+}
+
+impl SQLMetric {
+    /// Create a new SQLMetric
+    pub fn new(kind: MetricKind, partition: Option<usize>) -> Self {
+        Self {
+            value: 0.into(),
+            labels: vec![],
+            partition,
+            kind,
+        }
+    }
+
+    /// Add a new label to this metric
+    pub fn new_with_labels(
+        kind: MetricKind,
+        partition: Option<usize>,
+        labels: Vec<Label>,
+    ) -> Self {
+        Self {
+            value: 0.into(),
+            labels,
+            partition,
+            kind,
+        }
+    }
+
+    /// Add a new label to this metric
+    pub fn with(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add the standard name for output rows
+
+    /// Get the current value
+    pub fn value(&self) -> usize {
+        self.value.load(Ordering::Relaxed)
+    }
+
+    /// get the kind of this metric
+    pub fn kind(&self) -> &MetricKind {
+        &self.kind
+    }
+
+    /// Add `n` to the metric's value
+    pub fn add(&self, n: usize) {

Review comment:
       I like that metric recording doesn't involve any string manipulation :+1:

##########
File path: datafusion/src/physical_plan/metrics.rs
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metrics for recording information about execution
+
+pub mod wrappers;
+
+use std::{
+    fmt::{Debug, Display},
+    sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc, Mutex,
+    },
+};
+
+use hashbrown::HashMap;
+
+use self::wrappers::{Count, Time};
+
+/// Structure for constructing metrics, counters, timers, etc
+pub struct MetricBuilder<'a> {
+    /// Location that the metric created by this builder will be added do
+    metrics: &'a SharedMetricsSet,
+
+    /// optional partition number
+    partition: Option<usize>,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+}
+
+impl<'a> MetricBuilder<'a> {
+    /// Create a new `MetricBuilder` that will register the result of `build()` with the `metrics`
+    pub fn new(metrics: &'a SharedMetricsSet) -> Self {
+        Self {
+            metrics,
+            partition: None,
+            labels: vec![],
+        }
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_label(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_new_label(
+        self,
+        name: impl Into<Arc<str>>,
+        value: impl Into<Arc<str>>,
+    ) -> Self {
+        self.with_label(Label::new(name.into(), value.into()))
+    }
+
+    /// Set the partition of the metric being constructed
+    pub fn with_partition(mut self, partition: usize) -> Self {
+        self.partition = Some(partition);
+        self
+    }
+
+    /// Consume self and create a metric of the specified kind
+    /// registered with the MetricsSet
+    pub fn build(self, kind: MetricKind) -> Arc<SQLMetric> {
+        let Self {
+            labels,
+            partition,
+            metrics,
+        } = self;
+        let metric = Arc::new(SQLMetric::new_with_labels(kind, partition, labels));
+        metrics.register(metric.clone());
+        metric
+    }
+
+    /// Consume self and create a new counter for recording output rows
+    pub fn output_rows(self, partition: usize) -> Count {
+        let metric = self.with_partition(partition).build(MetricKind::OutputRows);
+        Count::new(metric)
+    }
+
+    /// Consumes self and creates a new Countr for recording
+    /// some metric of an operators
+    pub fn counter(self, counter_name: impl Into<Arc<str>>, partition: usize) -> Count {
+        let metric = self
+            .with_partition(partition)
+            .build(MetricKind::Custom(counter_name.into()));
+        Count::new(metric)
+    }
+
+    /// Consumes self and creates a new Counter for recording
+    /// some metric of an overall operator (not per partition
+    pub fn global_counter(self, counter_name: impl Into<Arc<str>>) -> Count {
+        let metric = self.build(MetricKind::Custom(counter_name.into()));
+        Count::new(metric)
+    }
+
+    /// Consume self and create a new Timer for recording the overall cpu time
+    /// spent by an operator
+    pub fn cpu_time(self, partition: usize) -> Time {
+        let metric = self.with_partition(partition).build(MetricKind::CPUTime);
+        Time::new(metric)
+    }
+
+    /// Consumes self and creates a new Timer for recording some
+    /// subset of of an operators execution time
+    pub fn subset_time(self, subset_name: impl Into<Arc<str>>, partition: usize) -> Time {
+        let metric = self
+            .with_partition(partition)
+            .build(MetricKind::Custom(subset_name.into()));
+        Time::new(metric)
+    }
+}
+
+/// Something that tracks the metrics of an execution using an atomic
+/// usize
+#[derive(Debug)]
+pub struct SQLMetric {
+    /// value of the metric
+    value: AtomicUsize,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+
+    /// To which partition of an operators output did this metric
+    /// apply? If None means all partitions.
+    partition: Option<usize>,
+
+    /// The kind of metric (how to logically interpret the value)
+    kind: MetricKind,
+}
+
+impl Display for SQLMetric {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.kind)?;
+
+        let mut iter = self
+            .partition
+            .iter()
+            .map(|partition| {
+                Label::new(
+                    Arc::from("partition"),
+                    Arc::from(partition.to_string().as_str()),
+                )
+            })
+            .chain(self.labels().iter().cloned())
+            .peekable();
+
+        // print out the labels specially
+        if iter.peek().is_some() {
+            write!(f, "{{")?;
+
+            let mut is_first = true;
+            for i in iter {
+                if !is_first {
+                    write!(f, ", ")?;
+                } else {
+                    is_first = false;
+                }
+
+                write!(f, "{}", i)?;
+            }
+
+            write!(f, "}}")?;
+        }
+
+        // and now the value
+        let format_as_duration = match &self.kind {
+            MetricKind::OutputRows => false,
+            MetricKind::CPUTime => true,
+            MetricKind::Custom(name) => name.contains("Time") || name.contains("time"),
+        };
+
+        if format_as_duration {
+            let duration = std::time::Duration::from_nanos(self.value() as u64);
+            write!(f, "={:?}", duration)
+        } else {
+            write!(f, "={}", self.value())
+        }
+    }
+}
+
+impl SQLMetric {
+    /// Create a new SQLMetric
+    pub fn new(kind: MetricKind, partition: Option<usize>) -> Self {
+        Self {
+            value: 0.into(),
+            labels: vec![],
+            partition,
+            kind,
+        }
+    }
+
+    /// Add a new label to this metric
+    pub fn new_with_labels(
+        kind: MetricKind,
+        partition: Option<usize>,
+        labels: Vec<Label>,
+    ) -> Self {
+        Self {
+            value: 0.into(),
+            labels,
+            partition,
+            kind,
+        }
+    }
+
+    /// Add a new label to this metric
+    pub fn with(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add the standard name for output rows
+
+    /// Get the current value
+    pub fn value(&self) -> usize {
+        self.value.load(Ordering::Relaxed)
+    }
+
+    /// get the kind of this metric
+    pub fn kind(&self) -> &MetricKind {
+        &self.kind
+    }
+
+    /// Add `n` to the metric's value
+    pub fn add(&self, n: usize) {
+        // relaxed ordering for operations on `value` poses no issues
+        // we're purely using atomic ops with no associated memory ops
+        self.value.fetch_add(n, Ordering::Relaxed);
+    }
+
+    /// Set the metric's value to `n`
+    pub fn set(&self, n: usize) {
+        self.value.store(n, Ordering::Relaxed);
+    }
+
+    /// What labels are present for this metric?
+    fn labels(&self) -> &[Label] {
+        &self.labels
+    }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+/// How should the value of the metric be interpreted?
+pub enum MetricKind {
+    /// Number of output rows produced
+    OutputRows,
+    /// CPU time
+    CPUTime,
+    // TODO timestamp, etc
+    // https://github.com/apache/arrow-datafusion/issues/866
+    /// Arbitarary user defined type
+    Custom(Arc<str>),

Review comment:
       I wonder if this should just be `&'static str` and MetricKind made Copy

##########
File path: datafusion/src/physical_plan/metrics.rs
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metrics for recording information about execution
+
+pub mod wrappers;
+
+use std::{
+    fmt::{Debug, Display},
+    sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc, Mutex,
+    },
+};
+
+use hashbrown::HashMap;
+
+use self::wrappers::{Count, Time};
+
+/// Structure for constructing metrics, counters, timers, etc
+pub struct MetricBuilder<'a> {
+    /// Location that the metric created by this builder will be added do
+    metrics: &'a SharedMetricsSet,
+
+    /// optional partition number
+    partition: Option<usize>,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+}
+
+impl<'a> MetricBuilder<'a> {
+    /// Create a new `MetricBuilder` that will register the result of `build()` with the `metrics`
+    pub fn new(metrics: &'a SharedMetricsSet) -> Self {
+        Self {
+            metrics,
+            partition: None,
+            labels: vec![],
+        }
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_label(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_new_label(
+        self,
+        name: impl Into<Arc<str>>,
+        value: impl Into<Arc<str>>,
+    ) -> Self {
+        self.with_label(Label::new(name.into(), value.into()))
+    }
+
+    /// Set the partition of the metric being constructed
+    pub fn with_partition(mut self, partition: usize) -> Self {
+        self.partition = Some(partition);
+        self
+    }
+
+    /// Consume self and create a metric of the specified kind
+    /// registered with the MetricsSet
+    pub fn build(self, kind: MetricKind) -> Arc<SQLMetric> {
+        let Self {
+            labels,
+            partition,
+            metrics,
+        } = self;
+        let metric = Arc::new(SQLMetric::new_with_labels(kind, partition, labels));
+        metrics.register(metric.clone());
+        metric
+    }
+
+    /// Consume self and create a new counter for recording output rows
+    pub fn output_rows(self, partition: usize) -> Count {
+        let metric = self.with_partition(partition).build(MetricKind::OutputRows);
+        Count::new(metric)
+    }
+
+    /// Consumes self and creates a new Countr for recording
+    /// some metric of an operators
+    pub fn counter(self, counter_name: impl Into<Arc<str>>, partition: usize) -> Count {
+        let metric = self
+            .with_partition(partition)
+            .build(MetricKind::Custom(counter_name.into()));
+        Count::new(metric)
+    }
+
+    /// Consumes self and creates a new Counter for recording
+    /// some metric of an overall operator (not per partition
+    pub fn global_counter(self, counter_name: impl Into<Arc<str>>) -> Count {
+        let metric = self.build(MetricKind::Custom(counter_name.into()));
+        Count::new(metric)
+    }
+
+    /// Consume self and create a new Timer for recording the overall cpu time
+    /// spent by an operator
+    pub fn cpu_time(self, partition: usize) -> Time {
+        let metric = self.with_partition(partition).build(MetricKind::CPUTime);
+        Time::new(metric)
+    }
+
+    /// Consumes self and creates a new Timer for recording some
+    /// subset of of an operators execution time
+    pub fn subset_time(self, subset_name: impl Into<Arc<str>>, partition: usize) -> Time {
+        let metric = self
+            .with_partition(partition)
+            .build(MetricKind::Custom(subset_name.into()));
+        Time::new(metric)
+    }
+}
+
+/// Something that tracks the metrics of an execution using an atomic
+/// usize
+#[derive(Debug)]
+pub struct SQLMetric {
+    /// value of the metric
+    value: AtomicUsize,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+
+    /// To which partition of an operators output did this metric
+    /// apply? If None means all partitions.
+    partition: Option<usize>,
+
+    /// The kind of metric (how to logically interpret the value)
+    kind: MetricKind,
+}
+
+impl Display for SQLMetric {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.kind)?;
+
+        let mut iter = self
+            .partition
+            .iter()
+            .map(|partition| {
+                Label::new(
+                    Arc::from("partition"),
+                    Arc::from(partition.to_string().as_str()),
+                )
+            })
+            .chain(self.labels().iter().cloned())
+            .peekable();
+
+        // print out the labels specially
+        if iter.peek().is_some() {
+            write!(f, "{{")?;
+
+            let mut is_first = true;
+            for i in iter {
+                if !is_first {
+                    write!(f, ", ")?;
+                } else {
+                    is_first = false;
+                }
+
+                write!(f, "{}", i)?;
+            }
+
+            write!(f, "}}")?;
+        }
+
+        // and now the value
+        let format_as_duration = match &self.kind {
+            MetricKind::OutputRows => false,
+            MetricKind::CPUTime => true,
+            MetricKind::Custom(name) => name.contains("Time") || name.contains("time"),
+        };
+
+        if format_as_duration {
+            let duration = std::time::Duration::from_nanos(self.value() as u64);
+            write!(f, "={:?}", duration)
+        } else {
+            write!(f, "={}", self.value())
+        }
+    }
+}
+
+impl SQLMetric {
+    /// Create a new SQLMetric
+    pub fn new(kind: MetricKind, partition: Option<usize>) -> Self {
+        Self {
+            value: 0.into(),
+            labels: vec![],
+            partition,
+            kind,
+        }
+    }
+
+    /// Add a new label to this metric
+    pub fn new_with_labels(
+        kind: MetricKind,
+        partition: Option<usize>,
+        labels: Vec<Label>,
+    ) -> Self {
+        Self {
+            value: 0.into(),
+            labels,
+            partition,
+            kind,
+        }
+    }
+
+    /// Add a new label to this metric
+    pub fn with(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add the standard name for output rows
+
+    /// Get the current value
+    pub fn value(&self) -> usize {
+        self.value.load(Ordering::Relaxed)
+    }
+
+    /// get the kind of this metric
+    pub fn kind(&self) -> &MetricKind {
+        &self.kind
+    }
+
+    /// Add `n` to the metric's value
+    pub fn add(&self, n: usize) {
+        // relaxed ordering for operations on `value` poses no issues
+        // we're purely using atomic ops with no associated memory ops
+        self.value.fetch_add(n, Ordering::Relaxed);
+    }
+
+    /// Set the metric's value to `n`
+    pub fn set(&self, n: usize) {
+        self.value.store(n, Ordering::Relaxed);
+    }
+
+    /// What labels are present for this metric?
+    fn labels(&self) -> &[Label] {
+        &self.labels
+    }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+/// How should the value of the metric be interpreted?
+pub enum MetricKind {
+    /// Number of output rows produced
+    OutputRows,
+    /// CPU time
+    CPUTime,
+    // TODO timestamp, etc
+    // https://github.com/apache/arrow-datafusion/issues/866
+    /// Arbitarary user defined type
+    Custom(Arc<str>),
+}
+
+impl Display for MetricKind {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.name())
+    }
+}
+
+impl MetricKind {
+    /// return a user displayable name of this kind of metric
+    pub fn name(&self) -> &str {
+        match self {
+            MetricKind::OutputRows => "outputRows",
+            MetricKind::Custom(name) => name,
+            MetricKind::CPUTime => "cpuTime",
+        }
+    }
+}
+
+/// A set of SQLMetrics for a particular operator
+#[derive(Default, Debug, Clone)]
+pub struct MetricsSet {
+    metrics: Vec<Arc<SQLMetric>>,
+}
+
+impl MetricsSet {
+    /// Create a new container of metrics
+    pub fn new() -> Self {
+        Default::default()
+    }
+
+    /// Add the specified metric
+    pub fn push(&mut self, metric: Arc<SQLMetric>) {
+        self.metrics.push(metric)
+    }
+
+    /// Add all [`SQLMetric`]s in this set to the specified array.
+    fn extend_other(&mut self, metrics: &mut Vec<Arc<SQLMetric>>) {
+        metrics.extend(self.metrics.iter().cloned())
+    }
+
+    /// convenience: return the number of rows produced, aggregated
+    /// across partitions or None if no metric is present
+    pub fn output_rows(&self) -> Option<usize> {
+        self.sum(|metric| matches!(metric.kind(), MetricKind::OutputRows))
+    }
+
+    /// convenience: return the amount of CPU time spent, aggregated
+    /// across partitions or None if no metric is present
+    pub fn cpu_time(&self) -> Option<usize> {
+        self.sum(|metric| matches!(metric.kind(), MetricKind::CPUTime))
+    }
+
+    /// Sums the values for metrics for which `f(metric)` returns
+    /// true, and returns the value. Returns None if no metrics match
+    /// the predicate.
+    pub fn sum<F>(&self, mut f: F) -> Option<usize>
+    where
+        F: FnMut(&SQLMetric) -> bool,
+    {
+        let mut iter = self
+            .metrics
+            .iter()
+            .filter(|metric| f(metric.as_ref()))
+            .peekable();
+
+        if iter.peek().is_none() {
+            None
+        } else {
+            Some(iter.map(|metric| metric.value()).sum())
+        }
+    }
+
+    /// Returns returns a new derived `MetricsSet` where all metrics
+    /// that had partition=`Some(..)` have been aggregated
+    /// together. The resulting `MetricsSet` has all metrics with `Partition=None`
+    pub fn aggregate_by_partition(&self) -> Self {
+        let mut map = HashMap::new();
+
+        // There are all sorts of ways to make this more efficient
+        for metric in &self.metrics {
+            let key = (metric.kind.clone(), metric.labels.clone());
+            map.entry(key)
+                .and_modify(|accum: &mut SQLMetric| {
+                    accum.set(accum.value() + metric.value())
+                })
+                .or_insert_with(|| {
+                    // accumuate with no partition

Review comment:
       ```suggestion
                       // accumulate with no partition
   ```

##########
File path: datafusion/src/physical_plan/metrics.rs
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metrics for recording information about execution
+
+pub mod wrappers;
+
+use std::{
+    fmt::{Debug, Display},
+    sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc, Mutex,
+    },
+};
+
+use hashbrown::HashMap;
+
+use self::wrappers::{Count, Time};
+
+/// Structure for constructing metrics, counters, timers, etc
+pub struct MetricBuilder<'a> {
+    /// Location that the metric created by this builder will be added do
+    metrics: &'a SharedMetricsSet,
+
+    /// optional partition number
+    partition: Option<usize>,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+}
+
+impl<'a> MetricBuilder<'a> {
+    /// Create a new `MetricBuilder` that will register the result of `build()` with the `metrics`
+    pub fn new(metrics: &'a SharedMetricsSet) -> Self {
+        Self {
+            metrics,
+            partition: None,
+            labels: vec![],
+        }
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_label(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_new_label(
+        self,
+        name: impl Into<Arc<str>>,
+        value: impl Into<Arc<str>>,
+    ) -> Self {
+        self.with_label(Label::new(name.into(), value.into()))
+    }
+
+    /// Set the partition of the metric being constructed
+    pub fn with_partition(mut self, partition: usize) -> Self {
+        self.partition = Some(partition);
+        self
+    }
+
+    /// Consume self and create a metric of the specified kind
+    /// registered with the MetricsSet
+    pub fn build(self, kind: MetricKind) -> Arc<SQLMetric> {
+        let Self {
+            labels,
+            partition,
+            metrics,
+        } = self;
+        let metric = Arc::new(SQLMetric::new_with_labels(kind, partition, labels));
+        metrics.register(metric.clone());
+        metric
+    }
+
+    /// Consume self and create a new counter for recording output rows
+    pub fn output_rows(self, partition: usize) -> Count {
+        let metric = self.with_partition(partition).build(MetricKind::OutputRows);
+        Count::new(metric)
+    }
+
+    /// Consumes self and creates a new Countr for recording
+    /// some metric of an operators
+    pub fn counter(self, counter_name: impl Into<Arc<str>>, partition: usize) -> Count {
+        let metric = self
+            .with_partition(partition)
+            .build(MetricKind::Custom(counter_name.into()));
+        Count::new(metric)
+    }
+
+    /// Consumes self and creates a new Counter for recording
+    /// some metric of an overall operator (not per partition
+    pub fn global_counter(self, counter_name: impl Into<Arc<str>>) -> Count {
+        let metric = self.build(MetricKind::Custom(counter_name.into()));
+        Count::new(metric)
+    }
+
+    /// Consume self and create a new Timer for recording the overall cpu time
+    /// spent by an operator
+    pub fn cpu_time(self, partition: usize) -> Time {
+        let metric = self.with_partition(partition).build(MetricKind::CPUTime);
+        Time::new(metric)
+    }
+
+    /// Consumes self and creates a new Timer for recording some
+    /// subset of of an operators execution time
+    pub fn subset_time(self, subset_name: impl Into<Arc<str>>, partition: usize) -> Time {
+        let metric = self
+            .with_partition(partition)
+            .build(MetricKind::Custom(subset_name.into()));
+        Time::new(metric)
+    }
+}
+
+/// Something that tracks the metrics of an execution using an atomic
+/// usize
+#[derive(Debug)]
+pub struct SQLMetric {
+    /// value of the metric
+    value: AtomicUsize,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+
+    /// To which partition of an operators output did this metric
+    /// apply? If None means all partitions.
+    partition: Option<usize>,
+
+    /// The kind of metric (how to logically interpret the value)
+    kind: MetricKind,
+}
+
+impl Display for SQLMetric {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.kind)?;
+
+        let mut iter = self
+            .partition
+            .iter()
+            .map(|partition| {
+                Label::new(
+                    Arc::from("partition"),
+                    Arc::from(partition.to_string().as_str()),
+                )
+            })
+            .chain(self.labels().iter().cloned())
+            .peekable();
+
+        // print out the labels specially
+        if iter.peek().is_some() {
+            write!(f, "{{")?;
+
+            let mut is_first = true;
+            for i in iter {
+                if !is_first {
+                    write!(f, ", ")?;
+                } else {
+                    is_first = false;
+                }
+
+                write!(f, "{}", i)?;
+            }
+
+            write!(f, "}}")?;
+        }
+
+        // and now the value
+        let format_as_duration = match &self.kind {
+            MetricKind::OutputRows => false,
+            MetricKind::CPUTime => true,
+            MetricKind::Custom(name) => name.contains("Time") || name.contains("time"),
+        };
+
+        if format_as_duration {
+            let duration = std::time::Duration::from_nanos(self.value() as u64);
+            write!(f, "={:?}", duration)
+        } else {
+            write!(f, "={}", self.value())
+        }
+    }
+}
+
+impl SQLMetric {
+    /// Create a new SQLMetric
+    pub fn new(kind: MetricKind, partition: Option<usize>) -> Self {
+        Self {
+            value: 0.into(),
+            labels: vec![],
+            partition,
+            kind,
+        }
+    }
+
+    /// Add a new label to this metric
+    pub fn new_with_labels(
+        kind: MetricKind,
+        partition: Option<usize>,
+        labels: Vec<Label>,
+    ) -> Self {
+        Self {
+            value: 0.into(),
+            labels,
+            partition,
+            kind,
+        }
+    }
+
+    /// Add a new label to this metric
+    pub fn with(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add the standard name for output rows
+
+    /// Get the current value
+    pub fn value(&self) -> usize {
+        self.value.load(Ordering::Relaxed)
+    }
+
+    /// get the kind of this metric
+    pub fn kind(&self) -> &MetricKind {
+        &self.kind
+    }
+
+    /// Add `n` to the metric's value
+    pub fn add(&self, n: usize) {
+        // relaxed ordering for operations on `value` poses no issues
+        // we're purely using atomic ops with no associated memory ops
+        self.value.fetch_add(n, Ordering::Relaxed);
+    }
+
+    /// Set the metric's value to `n`
+    pub fn set(&self, n: usize) {
+        self.value.store(n, Ordering::Relaxed);
+    }
+
+    /// What labels are present for this metric?
+    fn labels(&self) -> &[Label] {
+        &self.labels
+    }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+/// How should the value of the metric be interpreted?
+pub enum MetricKind {
+    /// Number of output rows produced
+    OutputRows,
+    /// CPU time
+    CPUTime,
+    // TODO timestamp, etc
+    // https://github.com/apache/arrow-datafusion/issues/866
+    /// Arbitarary user defined type
+    Custom(Arc<str>),
+}
+
+impl Display for MetricKind {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.name())
+    }
+}
+
+impl MetricKind {
+    /// return a user displayable name of this kind of metric
+    pub fn name(&self) -> &str {
+        match self {
+            MetricKind::OutputRows => "outputRows",
+            MetricKind::Custom(name) => name,
+            MetricKind::CPUTime => "cpuTime",
+        }
+    }
+}
+
+/// A set of SQLMetrics for a particular operator
+#[derive(Default, Debug, Clone)]
+pub struct MetricsSet {
+    metrics: Vec<Arc<SQLMetric>>,
+}
+
+impl MetricsSet {
+    /// Create a new container of metrics
+    pub fn new() -> Self {
+        Default::default()
+    }
+
+    /// Add the specified metric
+    pub fn push(&mut self, metric: Arc<SQLMetric>) {
+        self.metrics.push(metric)
+    }
+
+    /// Add all [`SQLMetric`]s in this set to the specified array.
+    fn extend_other(&mut self, metrics: &mut Vec<Arc<SQLMetric>>) {
+        metrics.extend(self.metrics.iter().cloned())
+    }
+
+    /// convenience: return the number of rows produced, aggregated
+    /// across partitions or None if no metric is present
+    pub fn output_rows(&self) -> Option<usize> {
+        self.sum(|metric| matches!(metric.kind(), MetricKind::OutputRows))
+    }
+
+    /// convenience: return the amount of CPU time spent, aggregated
+    /// across partitions or None if no metric is present
+    pub fn cpu_time(&self) -> Option<usize> {
+        self.sum(|metric| matches!(metric.kind(), MetricKind::CPUTime))
+    }
+
+    /// Sums the values for metrics for which `f(metric)` returns
+    /// true, and returns the value. Returns None if no metrics match
+    /// the predicate.
+    pub fn sum<F>(&self, mut f: F) -> Option<usize>
+    where
+        F: FnMut(&SQLMetric) -> bool,
+    {
+        let mut iter = self
+            .metrics
+            .iter()
+            .filter(|metric| f(metric.as_ref()))
+            .peekable();
+
+        if iter.peek().is_none() {
+            None
+        } else {
+            Some(iter.map(|metric| metric.value()).sum())
+        }
+    }
+
+    /// Returns returns a new derived `MetricsSet` where all metrics
+    /// that had partition=`Some(..)` have been aggregated
+    /// together. The resulting `MetricsSet` has all metrics with `Partition=None`
+    pub fn aggregate_by_partition(&self) -> Self {
+        let mut map = HashMap::new();
+
+        // There are all sorts of ways to make this more efficient
+        for metric in &self.metrics {
+            let key = (metric.kind.clone(), metric.labels.clone());
+            map.entry(key)
+                .and_modify(|accum: &mut SQLMetric| {
+                    accum.set(accum.value() + metric.value())
+                })
+                .or_insert_with(|| {
+                    // accumuate with no partition
+                    let partition = None;
+                    let accum = SQLMetric::new_with_labels(
+                        metric.kind().clone(),
+                        partition,
+                        metric.labels().to_vec(),
+                    );
+                    accum.set(metric.value());
+                    accum
+                });
+        }
+
+        let new_metrics = map
+            .into_iter()
+            .map(|(_k, v)| Arc::new(v))
+            .collect::<Vec<_>>();
+
+        Self {
+            metrics: new_metrics,
+        }
+    }
+}
+
+impl Display for MetricsSet {
+    /// format the MetricsSet as a single string
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let mut is_first = true;
+        for i in self.metrics.iter() {
+            if !is_first {
+                write!(f, ", ")?;
+            } else {
+                is_first = false;
+            }
+
+            write!(f, "{}", i)?;
+        }
+        Ok(())
+    }
+}
+
+/// A set of SQLMetrics that can be added to as partitions
+/// execute. Designed to be a convenience for operator implementation
+#[derive(Default, Debug)]
+pub struct SharedMetricsSet {
+    inner: Mutex<MetricsSet>,
+}
+
+impl SharedMetricsSet {
+    /// Create a new empty shared metrics set
+    pub fn new() -> Self {
+        Self {
+            inner: Mutex::new(MetricsSet::new()),
+        }
+    }
+
+    /// Add the specified metric
+    pub fn register(&self, metric: Arc<SQLMetric>) {
+        self.inner.lock().expect("not poisoned").push(metric)
+    }
+
+    /// Add all [`SQLMetric`]s for this `ExecutionPlan` to the
+    /// specified array.
+    pub fn extend_other(&self, metrics: &mut Vec<Arc<SQLMetric>>) {
+        self.inner
+            .lock()
+            .expect("not poisoned")
+            .extend_other(metrics)
+    }
+
+    /// Return a clone of the inner MetricsSet
+    pub fn clone_inner(&self) -> MetricsSet {
+        let guard = self.inner.lock().expect("not poisoned");
+        (*guard).clone()
+    }
+}
+
+/// name=value pairs identifiying a metric. This concept is called various things
+/// in various different systems:
+///
+/// "labels" in
+/// [prometheus](https://prometheus.io/docs/concepts/data_model/) and
+/// "tags" in
+/// [InfluxDB](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/)
+/// , "attributes" in [open
+/// telemetry](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/datamodel.md],
+/// etc.
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct Label {
+    name: Arc<str>,

Review comment:
       See above comment on `Cow<'static, str>`

##########
File path: datafusion/src/physical_plan/metrics.rs
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metrics for recording information about execution
+
+pub mod wrappers;
+
+use std::{
+    fmt::{Debug, Display},
+    sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc, Mutex,
+    },
+};
+
+use hashbrown::HashMap;
+
+use self::wrappers::{Count, Time};
+
+/// Structure for constructing metrics, counters, timers, etc
+pub struct MetricBuilder<'a> {
+    /// Location that the metric created by this builder will be added do
+    metrics: &'a SharedMetricsSet,
+
+    /// optional partition number
+    partition: Option<usize>,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+}
+
+impl<'a> MetricBuilder<'a> {
+    /// Create a new `MetricBuilder` that will register the result of `build()` with the `metrics`
+    pub fn new(metrics: &'a SharedMetricsSet) -> Self {
+        Self {
+            metrics,
+            partition: None,
+            labels: vec![],
+        }
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_label(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_new_label(
+        self,
+        name: impl Into<Arc<str>>,

Review comment:
       The use of `Arc<str>` for this, and labels, etc... is a little bit surprising to me. I'd expect these to in most cases be static strings, except in a couple of places where they might be unique dynamic strings (e.g. the output of format!). Perhaps `Cow<'static, str>` might be a better fit and would avoid allocations for static strings?

##########
File path: datafusion/src/physical_plan/metrics.rs
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metrics for recording information about execution
+
+pub mod wrappers;
+
+use std::{
+    fmt::{Debug, Display},
+    sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc, Mutex,
+    },
+};
+
+use hashbrown::HashMap;
+
+use self::wrappers::{Count, Time};
+
+/// Structure for constructing metrics, counters, timers, etc
+pub struct MetricBuilder<'a> {
+    /// Location that the metric created by this builder will be added do
+    metrics: &'a SharedMetricsSet,
+
+    /// optional partition number
+    partition: Option<usize>,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+}
+
+impl<'a> MetricBuilder<'a> {
+    /// Create a new `MetricBuilder` that will register the result of `build()` with the `metrics`
+    pub fn new(metrics: &'a SharedMetricsSet) -> Self {
+        Self {
+            metrics,
+            partition: None,
+            labels: vec![],
+        }
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_label(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_new_label(
+        self,
+        name: impl Into<Arc<str>>,
+        value: impl Into<Arc<str>>,
+    ) -> Self {
+        self.with_label(Label::new(name.into(), value.into()))
+    }
+
+    /// Set the partition of the metric being constructed
+    pub fn with_partition(mut self, partition: usize) -> Self {
+        self.partition = Some(partition);
+        self
+    }
+
+    /// Consume self and create a metric of the specified kind
+    /// registered with the MetricsSet
+    pub fn build(self, kind: MetricKind) -> Arc<SQLMetric> {
+        let Self {
+            labels,
+            partition,
+            metrics,
+        } = self;
+        let metric = Arc::new(SQLMetric::new_with_labels(kind, partition, labels));
+        metrics.register(metric.clone());
+        metric
+    }
+
+    /// Consume self and create a new counter for recording output rows
+    pub fn output_rows(self, partition: usize) -> Count {
+        let metric = self.with_partition(partition).build(MetricKind::OutputRows);
+        Count::new(metric)
+    }
+
+    /// Consumes self and creates a new Countr for recording
+    /// some metric of an operators
+    pub fn counter(self, counter_name: impl Into<Arc<str>>, partition: usize) -> Count {
+        let metric = self
+            .with_partition(partition)
+            .build(MetricKind::Custom(counter_name.into()));
+        Count::new(metric)
+    }
+
+    /// Consumes self and creates a new Counter for recording
+    /// some metric of an overall operator (not per partition
+    pub fn global_counter(self, counter_name: impl Into<Arc<str>>) -> Count {
+        let metric = self.build(MetricKind::Custom(counter_name.into()));
+        Count::new(metric)
+    }
+
+    /// Consume self and create a new Timer for recording the overall cpu time
+    /// spent by an operator
+    pub fn cpu_time(self, partition: usize) -> Time {
+        let metric = self.with_partition(partition).build(MetricKind::CPUTime);
+        Time::new(metric)
+    }
+
+    /// Consumes self and creates a new Timer for recording some
+    /// subset of of an operators execution time
+    pub fn subset_time(self, subset_name: impl Into<Arc<str>>, partition: usize) -> Time {
+        let metric = self
+            .with_partition(partition)
+            .build(MetricKind::Custom(subset_name.into()));
+        Time::new(metric)
+    }
+}
+
+/// Something that tracks the metrics of an execution using an atomic
+/// usize
+#[derive(Debug)]
+pub struct SQLMetric {
+    /// value of the metric
+    value: AtomicUsize,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+
+    /// To which partition of an operators output did this metric
+    /// apply? If None means all partitions.
+    partition: Option<usize>,
+
+    /// The kind of metric (how to logically interpret the value)
+    kind: MetricKind,
+}
+
+impl Display for SQLMetric {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.kind)?;
+
+        let mut iter = self
+            .partition
+            .iter()
+            .map(|partition| {
+                Label::new(
+                    Arc::from("partition"),
+                    Arc::from(partition.to_string().as_str()),
+                )
+            })
+            .chain(self.labels().iter().cloned())
+            .peekable();
+
+        // print out the labels specially
+        if iter.peek().is_some() {
+            write!(f, "{{")?;
+
+            let mut is_first = true;
+            for i in iter {
+                if !is_first {
+                    write!(f, ", ")?;
+                } else {
+                    is_first = false;
+                }
+
+                write!(f, "{}", i)?;
+            }
+
+            write!(f, "}}")?;
+        }
+
+        // and now the value
+        let format_as_duration = match &self.kind {
+            MetricKind::OutputRows => false,
+            MetricKind::CPUTime => true,
+            MetricKind::Custom(name) => name.contains("Time") || name.contains("time"),
+        };
+
+        if format_as_duration {
+            let duration = std::time::Duration::from_nanos(self.value() as u64);
+            write!(f, "={:?}", duration)
+        } else {
+            write!(f, "={}", self.value())
+        }
+    }
+}
+
+impl SQLMetric {
+    /// Create a new SQLMetric
+    pub fn new(kind: MetricKind, partition: Option<usize>) -> Self {
+        Self {
+            value: 0.into(),
+            labels: vec![],
+            partition,
+            kind,
+        }
+    }
+
+    /// Add a new label to this metric
+    pub fn new_with_labels(
+        kind: MetricKind,
+        partition: Option<usize>,
+        labels: Vec<Label>,
+    ) -> Self {
+        Self {
+            value: 0.into(),
+            labels,
+            partition,
+            kind,
+        }
+    }
+
+    /// Add a new label to this metric
+    pub fn with(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add the standard name for output rows
+
+    /// Get the current value
+    pub fn value(&self) -> usize {
+        self.value.load(Ordering::Relaxed)
+    }
+
+    /// get the kind of this metric
+    pub fn kind(&self) -> &MetricKind {
+        &self.kind
+    }
+
+    /// Add `n` to the metric's value
+    pub fn add(&self, n: usize) {
+        // relaxed ordering for operations on `value` poses no issues
+        // we're purely using atomic ops with no associated memory ops
+        self.value.fetch_add(n, Ordering::Relaxed);
+    }
+
+    /// Set the metric's value to `n`
+    pub fn set(&self, n: usize) {
+        self.value.store(n, Ordering::Relaxed);
+    }
+
+    /// What labels are present for this metric?
+    fn labels(&self) -> &[Label] {
+        &self.labels
+    }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+/// How should the value of the metric be interpreted?
+pub enum MetricKind {
+    /// Number of output rows produced
+    OutputRows,
+    /// CPU time
+    CPUTime,
+    // TODO timestamp, etc
+    // https://github.com/apache/arrow-datafusion/issues/866
+    /// Arbitarary user defined type
+    Custom(Arc<str>),
+}
+
+impl Display for MetricKind {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.name())
+    }
+}
+
+impl MetricKind {
+    /// return a user displayable name of this kind of metric
+    pub fn name(&self) -> &str {
+        match self {
+            MetricKind::OutputRows => "outputRows",
+            MetricKind::Custom(name) => name,
+            MetricKind::CPUTime => "cpuTime",
+        }
+    }
+}
+
+/// A set of SQLMetrics for a particular operator
+#[derive(Default, Debug, Clone)]
+pub struct MetricsSet {
+    metrics: Vec<Arc<SQLMetric>>,
+}
+
+impl MetricsSet {
+    /// Create a new container of metrics
+    pub fn new() -> Self {
+        Default::default()
+    }
+
+    /// Add the specified metric
+    pub fn push(&mut self, metric: Arc<SQLMetric>) {
+        self.metrics.push(metric)
+    }
+
+    /// Add all [`SQLMetric`]s in this set to the specified array.
+    fn extend_other(&mut self, metrics: &mut Vec<Arc<SQLMetric>>) {
+        metrics.extend(self.metrics.iter().cloned())
+    }
+
+    /// convenience: return the number of rows produced, aggregated
+    /// across partitions or None if no metric is present
+    pub fn output_rows(&self) -> Option<usize> {
+        self.sum(|metric| matches!(metric.kind(), MetricKind::OutputRows))
+    }
+
+    /// convenience: return the amount of CPU time spent, aggregated
+    /// across partitions or None if no metric is present
+    pub fn cpu_time(&self) -> Option<usize> {
+        self.sum(|metric| matches!(metric.kind(), MetricKind::CPUTime))
+    }
+
+    /// Sums the values for metrics for which `f(metric)` returns
+    /// true, and returns the value. Returns None if no metrics match
+    /// the predicate.
+    pub fn sum<F>(&self, mut f: F) -> Option<usize>
+    where
+        F: FnMut(&SQLMetric) -> bool,
+    {
+        let mut iter = self
+            .metrics
+            .iter()
+            .filter(|metric| f(metric.as_ref()))
+            .peekable();
+
+        if iter.peek().is_none() {
+            None
+        } else {
+            Some(iter.map(|metric| metric.value()).sum())
+        }
+    }
+
+    /// Returns returns a new derived `MetricsSet` where all metrics
+    /// that had partition=`Some(..)` have been aggregated
+    /// together. The resulting `MetricsSet` has all metrics with `Partition=None`
+    pub fn aggregate_by_partition(&self) -> Self {
+        let mut map = HashMap::new();
+
+        // There are all sorts of ways to make this more efficient
+        for metric in &self.metrics {
+            let key = (metric.kind.clone(), metric.labels.clone());
+            map.entry(key)
+                .and_modify(|accum: &mut SQLMetric| {
+                    accum.set(accum.value() + metric.value())
+                })
+                .or_insert_with(|| {
+                    // accumuate with no partition
+                    let partition = None;
+                    let accum = SQLMetric::new_with_labels(
+                        metric.kind().clone(),
+                        partition,
+                        metric.labels().to_vec(),
+                    );
+                    accum.set(metric.value());
+                    accum
+                });
+        }
+
+        let new_metrics = map
+            .into_iter()
+            .map(|(_k, v)| Arc::new(v))
+            .collect::<Vec<_>>();
+
+        Self {
+            metrics: new_metrics,
+        }
+    }
+}
+
+impl Display for MetricsSet {
+    /// format the MetricsSet as a single string
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let mut is_first = true;
+        for i in self.metrics.iter() {
+            if !is_first {
+                write!(f, ", ")?;
+            } else {
+                is_first = false;
+            }
+
+            write!(f, "{}", i)?;
+        }
+        Ok(())
+    }
+}
+
+/// A set of SQLMetrics that can be added to as partitions
+/// execute. Designed to be a convenience for operator implementation
+#[derive(Default, Debug)]
+pub struct SharedMetricsSet {

Review comment:
       The name of this confused me for a while. At least on my first pass Shared made me think shared between operators, when this clearly isn't the case. I don't really have a better suggestion for a name, but thought I'd mention it. Maybe OperatorMetricsSet or something, I don't know...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #901: Implement new metrics API

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #901:
URL: https://github.com/apache/arrow-datafusion/pull/901#issuecomment-901369459


   I have some non trivial feedback to incorporate so marking this as a draft (and maybe I will open a new PR after updating the proposal)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #901: Implement new metrics API

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #901:
URL: https://github.com/apache/arrow-datafusion/pull/901#issuecomment-899754600


   cc @tustvold  would like your opinion on the suitability / consistency of the metrics.rs API in this PR to other metric APIs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #901: Implement new metrics API

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #901:
URL: https://github.com/apache/arrow-datafusion/pull/901#discussion_r691444996



##########
File path: datafusion/src/physical_plan/metrics/wrappers.rs
##########
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//!  wrappers for `SQLMetrics` for more conveniently recording execution metrics
+
+use std::{sync::Arc, time::Instant};
+
+use super::SQLMetric;
+
+// pub trait MetricSource {
+//     /// Return the underlying metric
+//     pub fn metric(&self) -> Arc<SQLMetric>;
+// }
+
+/// a SQLMetric wrapper for a counter (number of input or output rows)
+///
+/// Note `clone` counters update the same underlying metrics
+#[derive(Debug, Clone)]
+pub struct Count {
+    inner: Arc<SQLMetric>,
+}
+
+impl Count {
+    /// create a new counter wrapper around this metric
+    pub fn new(inner: Arc<SQLMetric>) -> Self {
+        Self { inner }
+    }
+
+    /// Add `n` to the counter's value
+    pub fn add(&self, n: usize) {
+        self.inner.add(n)
+    }
+}
+
+/// a SQLMetric wrapper for CPU timing information
+#[derive(Debug, Clone)]
+pub struct Time {
+    inner: Arc<SQLMetric>,
+}
+
+impl Time {
+    /// Create a new [`Time`] wrapper suitable for recording elapsed
+    /// times for operations.
+    pub fn new(inner: Arc<SQLMetric>) -> Self {

Review comment:
       This is a good point. 🤔  though now I think about it the more Rust-y way of doing this would be to do it in the type system... so perhaps I will collapse `MetricKind` 🤔 

##########
File path: ballista/rust/core/src/execution_plans/shuffle_writer.rs
##########
@@ -71,24 +74,30 @@ pub struct ShuffleWriterExec {
     work_dir: String,
     /// Optional shuffle output partitioning
     shuffle_output_partitioning: Option<Partitioning>,
-    /// Shuffle write metrics
-    metrics: ShuffleWriteMetrics,
+    /// Execution metrics
+    metrics: Arc<SharedMetricsSet>,

Review comment:
       I think I was just copy/pasting the use from operators (like Parquet) that did need to `clone` their metrics. Upon further reflection, I think it is simpler to move the `Arc` inside and have the external interface simply be `SharedMetricsSet`.  I have done so




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #901: Implement new metrics API

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #901:
URL: https://github.com/apache/arrow-datafusion/pull/901#discussion_r691527848



##########
File path: datafusion/src/physical_plan/metrics.rs
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metrics for recording information about execution
+
+pub mod wrappers;
+
+use std::{
+    fmt::{Debug, Display},
+    sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc, Mutex,
+    },
+};
+
+use hashbrown::HashMap;
+
+use self::wrappers::{Count, Time};
+
+/// Structure for constructing metrics, counters, timers, etc
+pub struct MetricBuilder<'a> {
+    /// Location that the metric created by this builder will be added do
+    metrics: &'a SharedMetricsSet,
+
+    /// optional partition number
+    partition: Option<usize>,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+}
+
+impl<'a> MetricBuilder<'a> {
+    /// Create a new `MetricBuilder` that will register the result of `build()` with the `metrics`
+    pub fn new(metrics: &'a SharedMetricsSet) -> Self {
+        Self {
+            metrics,
+            partition: None,
+            labels: vec![],
+        }
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_label(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_new_label(
+        self,
+        name: impl Into<Arc<str>>,

Review comment:
       Changed to `Cow<'static, str>` as suggested




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org