You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/08/24 15:05:59 UTC

[arrow-datafusion] branch master updated: Improve SQLMetric APIs, port existing metrics (#908)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new bd49b86  Improve SQLMetric APIs, port existing metrics (#908)
bd49b86 is described below

commit bd49b86c48b62d3b215fa30e6fabb468928fd895
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Tue Aug 24 11:05:51 2021 -0400

    Improve SQLMetric APIs, port existing metrics (#908)
---
 .../core/src/execution_plans/shuffle_reader.rs     |  24 +-
 .../core/src/execution_plans/shuffle_writer.rs     |  60 +--
 ballista/rust/core/src/utils.rs                    |  12 +-
 datafusion/src/physical_optimizer/repartition.rs   |  13 +-
 datafusion/src/physical_plan/analyze.rs            |   8 +
 datafusion/src/physical_plan/display.rs            |  69 ++-
 datafusion/src/physical_plan/hash_aggregate.rs     |  32 +-
 datafusion/src/physical_plan/hash_join.rs          | 100 ++--
 datafusion/src/physical_plan/metrics/builder.rs    | 150 ++++++
 datafusion/src/physical_plan/metrics/mod.rs        | 528 +++++++++++++++++++++
 datafusion/src/physical_plan/metrics/value.rs      | 261 ++++++++++
 datafusion/src/physical_plan/mod.rs                | 114 +----
 datafusion/src/physical_plan/parquet.rs            | 131 +++--
 datafusion/src/physical_plan/repartition.rs        |  90 ++--
 datafusion/src/physical_plan/sort.rs               |  44 +-
 datafusion/tests/parquet_pruning.rs                |  68 +--
 datafusion/tests/sql.rs                            |   6 +-
 17 files changed, 1320 insertions(+), 390 deletions(-)

diff --git a/ballista/rust/core/src/execution_plans/shuffle_reader.rs b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
index 0447ca9..bc5dbc1 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
@@ -28,9 +28,10 @@ use async_trait::async_trait;
 use datafusion::arrow::datatypes::SchemaRef;
 use datafusion::arrow::error::Result as ArrowResult;
 use datafusion::arrow::record_batch::RecordBatch;
-use datafusion::physical_plan::{
-    DisplayFormatType, ExecutionPlan, Partitioning, SQLMetric,
+use datafusion::physical_plan::metrics::{
+    ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
 };
+use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan, Metric, Partitioning};
 use datafusion::{
     error::{DataFusionError, Result},
     physical_plan::RecordBatchStream,
@@ -47,8 +48,8 @@ pub struct ShuffleReaderExec {
     /// Each partition of a shuffle can read data from multiple locations
     pub(crate) partition: Vec<Vec<PartitionLocation>>,
     pub(crate) schema: SchemaRef,
-    /// Time to fetch data from executor
-    fetch_time: Arc<SQLMetric>,
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
 }
 
 impl ShuffleReaderExec {
@@ -60,7 +61,7 @@ impl ShuffleReaderExec {
         Ok(Self {
             partition,
             schema,
-            fetch_time: SQLMetric::time_nanos(),
+            metrics: ExecutionPlanMetricsSet::new(),
         })
     }
 }
@@ -100,13 +101,16 @@ impl ExecutionPlan for ShuffleReaderExec {
     ) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
         info!("ShuffleReaderExec::execute({})", partition);
 
-        let start = Instant::now();
+        let fetch_time =
+            MetricBuilder::new(&self.metrics).subset_time("fetch_time", partition);
+        let timer = fetch_time.timer();
+
         let partition_locations = &self.partition[partition];
         let result = future::join_all(partition_locations.iter().map(fetch_partition))
             .await
             .into_iter()
             .collect::<Result<Vec<_>>>()?;
-        self.fetch_time.add_elapsed(start);
+        timer.done();
 
         let result = WrappedStream::new(
             Box::pin(futures::stream::iter(result).flatten()),
@@ -149,10 +153,8 @@ impl ExecutionPlan for ShuffleReaderExec {
         }
     }
 
-    fn metrics(&self) -> HashMap<String, SQLMetric> {
-        let mut metrics = HashMap::new();
-        metrics.insert("fetchTime".to_owned(), (*self.fetch_time).clone());
-        metrics
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
     }
 }
 
diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
index b1db21f..36e445b 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
@@ -45,10 +45,13 @@ use datafusion::arrow::ipc::writer::FileWriter;
 use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::error::{DataFusionError, Result};
 use datafusion::physical_plan::hash_utils::create_hashes;
+use datafusion::physical_plan::metrics::{
+    self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
+};
 use datafusion::physical_plan::repartition::RepartitionExec;
 use datafusion::physical_plan::Partitioning::RoundRobinBatch;
 use datafusion::physical_plan::{
-    DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SQLMetric,
+    DisplayFormatType, ExecutionPlan, Metric, Partitioning, RecordBatchStream,
 };
 use futures::StreamExt;
 use hashbrown::HashMap;
@@ -71,24 +74,30 @@ pub struct ShuffleWriterExec {
     work_dir: String,
     /// Optional shuffle output partitioning
     shuffle_output_partitioning: Option<Partitioning>,
-    /// Shuffle write metrics
-    metrics: ShuffleWriteMetrics,
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
 }
 
 #[derive(Debug, Clone)]
 struct ShuffleWriteMetrics {
     /// Time spend writing batches to shuffle files
-    write_time: Arc<SQLMetric>,
-    input_rows: Arc<SQLMetric>,
-    output_rows: Arc<SQLMetric>,
+    write_time: metrics::Time,
+    input_rows: metrics::Count,
+    output_rows: metrics::Count,
 }
 
 impl ShuffleWriteMetrics {
-    fn new() -> Self {
+    fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
+        let write_time = MetricBuilder::new(metrics).subset_time("write_time", partition);
+
+        let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
+
+        let output_rows = MetricBuilder::new(metrics).output_rows(partition);
+
         Self {
-            write_time: SQLMetric::time_nanos(),
-            input_rows: SQLMetric::counter(),
-            output_rows: SQLMetric::counter(),
+            write_time,
+            input_rows,
+            output_rows,
         }
     }
 }
@@ -108,7 +117,7 @@ impl ShuffleWriterExec {
             plan,
             work_dir,
             shuffle_output_partitioning,
-            metrics: ShuffleWriteMetrics::new(),
+            metrics: ExecutionPlanMetricsSet::new(),
         })
     }
 
@@ -139,9 +148,11 @@ impl ShuffleWriterExec {
         path.push(&self.job_id);
         path.push(&format!("{}", self.stage_id));
 
+        let write_metrics = ShuffleWriteMetrics::new(input_partition, &self.metrics);
+
         match &self.shuffle_output_partitioning {
             None => {
-                let start = Instant::now();
+                let timer = write_metrics.write_time.timer();
                 path.push(&format!("{}", input_partition));
                 std::fs::create_dir_all(&path)?;
                 path.push("data.arrow");
@@ -152,18 +163,18 @@ impl ShuffleWriterExec {
                 let stats = utils::write_stream_to_disk(
                     &mut stream,
                     path,
-                    self.metrics.write_time.clone(),
+                    &write_metrics.write_time,
                 )
                 .await
                 .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
 
-                self.metrics
+                write_metrics
                     .input_rows
                     .add(stats.num_rows.unwrap_or(0) as usize);
-                self.metrics
+                write_metrics
                     .output_rows
                     .add(stats.num_rows.unwrap_or(0) as usize);
-                self.metrics.write_time.add_elapsed(start);
+                timer.done();
 
                 info!(
                     "Executed partition {} in {} seconds. Statistics: {}",
@@ -197,7 +208,7 @@ impl ShuffleWriterExec {
                 while let Some(result) = stream.next().await {
                     let input_batch = result?;
 
-                    self.metrics.input_rows.add(input_batch.num_rows());
+                    write_metrics.input_rows.add(input_batch.num_rows());
 
                     let arrays = exprs
                         .iter()
@@ -239,7 +250,7 @@ impl ShuffleWriterExec {
 
                         //TODO optimize so we don't write or fetch empty partitions
                         //if output_batch.num_rows() > 0 {
-                        let start = Instant::now();
+                        let timer = write_metrics.write_time.timer();
                         match &mut writers[output_partition] {
                             Some(w) => {
                                 w.write(&output_batch)?;
@@ -260,9 +271,8 @@ impl ShuffleWriterExec {
                                 writers[output_partition] = Some(writer);
                             }
                         }
-                        self.metrics.output_rows.add(output_batch.num_rows());
-                        self.metrics.write_time.add_elapsed(start);
-                        //}
+                        write_metrics.output_rows.add(output_batch.num_rows());
+                        timer.done();
                     }
                 }
 
@@ -388,12 +398,8 @@ impl ExecutionPlan for ShuffleWriterExec {
         Ok(Box::pin(MemoryStream::try_new(vec![batch], schema, None)?))
     }
 
-    fn metrics(&self) -> HashMap<String, SQLMetric> {
-        let mut metrics = HashMap::new();
-        metrics.insert("inputRows".to_owned(), (*self.metrics.input_rows).clone());
-        metrics.insert("outputRows".to_owned(), (*self.metrics.output_rows).clone());
-        metrics.insert("writeTime".to_owned(), (*self.metrics.write_time).clone());
-        metrics
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
     }
 
     fn fmt_as(
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index d753f70..bf0d152 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -61,7 +61,7 @@ use datafusion::physical_plan::parquet::ParquetExec;
 use datafusion::physical_plan::projection::ProjectionExec;
 use datafusion::physical_plan::sort::SortExec;
 use datafusion::physical_plan::{
-    AggregateExpr, ExecutionPlan, PhysicalExpr, RecordBatchStream, SQLMetric,
+    metrics, AggregateExpr, ExecutionPlan, Metric, PhysicalExpr, RecordBatchStream,
 };
 use futures::{future, Stream, StreamExt};
 use std::time::Instant;
@@ -71,7 +71,7 @@ use std::time::Instant;
 pub async fn write_stream_to_disk(
     stream: &mut Pin<Box<dyn RecordBatchStream + Send + Sync>>,
     path: &str,
-    disk_write_metric: Arc<SQLMetric>,
+    disk_write_metric: &metrics::Time,
 ) -> Result<PartitionStats> {
     let file = File::create(&path).map_err(|e| {
         BallistaError::General(format!(
@@ -97,13 +97,13 @@ pub async fn write_stream_to_disk(
         num_rows += batch.num_rows();
         num_bytes += batch_size_bytes;
 
-        let start = Instant::now();
+        let timer = disk_write_metric.timer();
         writer.write(&batch)?;
-        disk_write_metric.add_elapsed(start);
+        timer.done();
     }
-    let start = Instant::now();
+    let timer = disk_write_metric.timer();
     writer.finish()?;
-    disk_write_metric.add_elapsed(start);
+    timer.done();
     Ok(PartitionStats::new(
         Some(num_rows as u64),
         Some(num_batches),
diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs
index 4504c81..984b234 100644
--- a/datafusion/src/physical_optimizer/repartition.rs
+++ b/datafusion/src/physical_optimizer/repartition.rs
@@ -110,24 +110,25 @@ mod tests {
 
     use super::*;
     use crate::datasource::datasource::Statistics;
-    use crate::physical_plan::parquet::{
-        ParquetExec, ParquetExecMetrics, ParquetPartition,
-    };
+    use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
+    use crate::physical_plan::parquet::{ParquetExec, ParquetPartition};
     use crate::physical_plan::projection::ProjectionExec;
 
     #[test]
     fn added_repartition_to_single_partition() -> Result<()> {
         let schema = Arc::new(Schema::empty());
+        let metrics = ExecutionPlanMetricsSet::new();
         let parquet_project = ProjectionExec::try_new(
             vec![],
             Arc::new(ParquetExec::new(
                 vec![ParquetPartition::new(
                     vec!["x".to_string()],
                     Statistics::default(),
+                    metrics.clone(),
                 )],
                 schema,
                 None,
-                ParquetExecMetrics::new(),
+                metrics,
                 None,
                 2048,
                 None,
@@ -154,6 +155,7 @@ mod tests {
     #[test]
     fn repartition_deepest_node() -> Result<()> {
         let schema = Arc::new(Schema::empty());
+        let metrics = ExecutionPlanMetricsSet::new();
         let parquet_project = ProjectionExec::try_new(
             vec![],
             Arc::new(ProjectionExec::try_new(
@@ -162,10 +164,11 @@ mod tests {
                     vec![ParquetPartition::new(
                         vec!["x".to_string()],
                         Statistics::default(),
+                        metrics.clone(),
                     )],
                     schema,
                     None,
-                    ParquetExecMetrics::new(),
+                    metrics,
                     None,
                     2048,
                     None,
diff --git a/datafusion/src/physical_plan/analyze.rs b/datafusion/src/physical_plan/analyze.rs
index 36726ad..d012557 100644
--- a/datafusion/src/physical_plan/analyze.rs
+++ b/datafusion/src/physical_plan/analyze.rs
@@ -164,6 +164,14 @@ impl ExecutionPlan for AnalyzeExec {
             // Verbose output
             // TODO make this more sophisticated
             if verbose {
+                type_builder.append_value("Plan with Full Metrics").unwrap();
+
+                let annotated_plan =
+                    DisplayableExecutionPlan::with_full_metrics(captured_input.as_ref())
+                        .indent()
+                        .to_string();
+                plan_builder.append_value(annotated_plan).unwrap();
+
                 type_builder.append_value("Output Rows").unwrap();
                 plan_builder.append_value(total_rows.to_string()).unwrap();
 
diff --git a/datafusion/src/physical_plan/display.rs b/datafusion/src/physical_plan/display.rs
index e251e4e..5ff99e5 100644
--- a/datafusion/src/physical_plan/display.rs
+++ b/datafusion/src/physical_plan/display.rs
@@ -35,8 +35,8 @@ pub enum DisplayFormatType {
 /// Wraps an `ExecutionPlan` with various ways to display this plan
 pub struct DisplayableExecutionPlan<'a> {
     inner: &'a dyn ExecutionPlan,
-    /// whether to show metrics or not
-    with_metrics: bool,
+    /// How to show metrics
+    show_metrics: ShowMetrics,
 }
 
 impl<'a> DisplayableExecutionPlan<'a> {
@@ -45,16 +45,27 @@ impl<'a> DisplayableExecutionPlan<'a> {
     pub fn new(inner: &'a dyn ExecutionPlan) -> Self {
         Self {
             inner,
-            with_metrics: false,
+            show_metrics: ShowMetrics::None,
         }
     }
 
     /// Create a wrapper around an [`'ExecutionPlan'] which can be
-    /// pretty printed in a variety of ways
+    /// pretty printed in a variety of ways that also shows aggregated
+    /// metrics
     pub fn with_metrics(inner: &'a dyn ExecutionPlan) -> Self {
         Self {
             inner,
-            with_metrics: true,
+            show_metrics: ShowMetrics::Aggregated,
+        }
+    }
+
+    /// Create a wrapper around an [`'ExecutionPlan'] which can be
+    /// pretty printed in a variety of ways that also shows all low
+    /// level metrics
+    pub fn with_full_metrics(inner: &'a dyn ExecutionPlan) -> Self {
+        Self {
+            inner,
+            show_metrics: ShowMetrics::Full,
         }
     }
 
@@ -71,7 +82,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
     pub fn indent(&self) -> impl fmt::Display + 'a {
         struct Wrapper<'a> {
             plan: &'a dyn ExecutionPlan,
-            with_metrics: bool,
+            show_metrics: ShowMetrics,
         }
         impl<'a> fmt::Display for Wrapper<'a> {
             fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
@@ -80,18 +91,30 @@ impl<'a> DisplayableExecutionPlan<'a> {
                     t,
                     f,
                     indent: 0,
-                    with_metrics: self.with_metrics,
+                    show_metrics: self.show_metrics,
                 };
                 accept(self.plan, &mut visitor)
             }
         }
         Wrapper {
             plan: self.inner,
-            with_metrics: self.with_metrics,
+            show_metrics: self.show_metrics,
         }
     }
 }
 
+#[derive(Debug, Clone, Copy)]
+enum ShowMetrics {
+    /// Do not show any metrics
+    None,
+
+    /// Show aggregrated metrics across partition
+    Aggregated,
+
+    /// Show full per-partition metrics
+    Full,
+}
+
 /// Formats plans with a single line per node.
 struct IndentVisitor<'a, 'b> {
     /// How to format each node
@@ -100,8 +123,8 @@ struct IndentVisitor<'a, 'b> {
     f: &'a mut fmt::Formatter<'b>,
     /// Indent size
     indent: usize,
-    /// whether to show metrics or not
-    with_metrics: bool,
+    /// How to show metrics
+    show_metrics: ShowMetrics,
 }
 
 impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> {
@@ -112,16 +135,22 @@ impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> {
     ) -> std::result::Result<bool, Self::Error> {
         write!(self.f, "{:indent$}", "", indent = self.indent * 2)?;
         plan.fmt_as(self.t, self.f)?;
-        if self.with_metrics {
-            write!(
-                self.f,
-                ", metrics=[{}]",
-                plan.metrics()
-                    .iter()
-                    .map(|(k, v)| format!("{}={:?}", k, v.value))
-                    .collect::<Vec<_>>()
-                    .join(", ")
-            )?;
+        match self.show_metrics {
+            ShowMetrics::None => {}
+            ShowMetrics::Aggregated => {
+                if let Some(metrics) = plan.metrics() {
+                    write!(self.f, ", metrics=[{}]", metrics.aggregate_by_partition())?;
+                } else {
+                    write!(self.f, ", metrics=[]")?;
+                }
+            }
+            ShowMetrics::Full => {
+                if let Some(metrics) = plan.metrics() {
+                    write!(self.f, ", metrics=[{}]", metrics)?;
+                } else {
+                    write!(self.f, ", metrics=[]")?;
+                }
+            }
         }
         writeln!(self.f)?;
         self.indent += 1;
diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs
index 1c07f61..5a2ec9e 100644
--- a/datafusion/src/physical_plan/hash_aggregate.rs
+++ b/datafusion/src/physical_plan/hash_aggregate.rs
@@ -27,13 +27,12 @@ use futures::{
     stream::{Stream, StreamExt},
     Future,
 };
-use hashbrown::HashMap;
 
 use crate::error::{DataFusionError, Result};
 use crate::physical_plan::hash_utils::create_hashes;
 use crate::physical_plan::{
     Accumulator, AggregateExpr, DisplayFormatType, Distribution, ExecutionPlan,
-    Partitioning, PhysicalExpr, SQLMetric,
+    Partitioning, PhysicalExpr,
 };
 use crate::scalar::ScalarValue;
 
@@ -51,6 +50,7 @@ use pin_project_lite::pin_project;
 
 use async_trait::async_trait;
 
+use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
 use super::{expressions::Column, RecordBatchStream, SendableRecordBatchStream};
 
 /// Hash aggregate modes
@@ -86,8 +86,8 @@ pub struct HashAggregateExec {
     /// same as input.schema() but for the final aggregate it will be the same as the input
     /// to the partial aggregate
     input_schema: SchemaRef,
-    /// Metric to track number of output rows
-    output_rows: Arc<SQLMetric>,
+    /// Execution Metrics
+    metrics: ExecutionPlanMetricsSet,
 }
 
 fn create_schema(
@@ -136,8 +136,6 @@ impl HashAggregateExec {
 
         let schema = Arc::new(schema);
 
-        let output_rows = SQLMetric::counter();
-
         Ok(HashAggregateExec {
             mode,
             group_expr,
@@ -145,7 +143,7 @@ impl HashAggregateExec {
             input,
             schema,
             input_schema,
-            output_rows,
+            metrics: ExecutionPlanMetricsSet::new(),
         })
     }
 
@@ -209,6 +207,8 @@ impl ExecutionPlan for HashAggregateExec {
         let input = self.input.execute(partition).await?;
         let group_expr = self.group_expr.iter().map(|x| x.0.clone()).collect();
 
+        let output_rows = MetricBuilder::new(&self.metrics).output_rows(partition);
+
         if self.group_expr.is_empty() {
             Ok(Box::pin(HashAggregateStream::new(
                 self.mode,
@@ -223,7 +223,7 @@ impl ExecutionPlan for HashAggregateExec {
                 group_expr,
                 self.aggr_expr.clone(),
                 input,
-                self.output_rows.clone(),
+                output_rows,
             )))
         }
     }
@@ -246,10 +246,8 @@ impl ExecutionPlan for HashAggregateExec {
         }
     }
 
-    fn metrics(&self) -> HashMap<String, SQLMetric> {
-        let mut metrics = HashMap::new();
-        metrics.insert("outputRows".to_owned(), (*self.output_rows).clone());
-        metrics
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
     }
 
     fn fmt_as(
@@ -317,7 +315,7 @@ pin_project! {
         #[pin]
         output: futures::channel::oneshot::Receiver<ArrowResult<RecordBatch>>,
         finished: bool,
-        output_rows: Arc<SQLMetric>,
+        output_rows: metrics::Count,
     }
 }
 
@@ -526,7 +524,7 @@ impl GroupedHashAggregateStream {
         group_expr: Vec<Arc<dyn PhysicalExpr>>,
         aggr_expr: Vec<Arc<dyn AggregateExpr>>,
         input: SendableRecordBatchStream,
-        output_rows: Arc<SQLMetric>,
+        output_rows: metrics::Count,
     ) -> Self {
         let (tx, rx) = futures::channel::oneshot::channel();
 
@@ -1072,9 +1070,9 @@ mod tests {
 
         assert_batches_sorted_eq!(&expected, &result);
 
-        let metrics = merged_aggregate.metrics();
-        let output_rows = metrics.get("outputRows").unwrap();
-        assert_eq!(3, output_rows.value());
+        let metrics = merged_aggregate.metrics().unwrap();
+        let output_rows = metrics.output_rows().unwrap();
+        assert_eq!(3, output_rows);
 
         Ok(())
     }
diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs
index 9970824..3a0a3f2 100644
--- a/datafusion/src/physical_plan/hash_join.rs
+++ b/datafusion/src/physical_plan/hash_join.rs
@@ -35,7 +35,6 @@ use std::{time::Instant, vec};
 
 use async_trait::async_trait;
 use futures::{Stream, StreamExt, TryStreamExt};
-use hashbrown::HashMap;
 use tokio::sync::Mutex;
 
 use arrow::array::Array;
@@ -51,12 +50,15 @@ use arrow::array::{
 
 use hashbrown::raw::RawTable;
 
-use super::expressions::Column;
 use super::hash_utils::create_hashes;
 use super::{
     coalesce_partitions::CoalescePartitionsExec,
     hash_utils::{build_join_schema, check_join_is_valid, JoinOn},
 };
+use super::{
+    expressions::Column,
+    metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
+};
 use crate::error::{DataFusionError, Result};
 use crate::logical_plan::JoinType;
 
@@ -65,7 +67,7 @@ use super::{
     SendableRecordBatchStream,
 };
 use crate::physical_plan::coalesce_batches::concat_batches;
-use crate::physical_plan::{PhysicalExpr, SQLMetric};
+use crate::physical_plan::PhysicalExpr;
 use log::debug;
 use std::fmt;
 
@@ -111,33 +113,45 @@ pub struct HashJoinExec {
     random_state: RandomState,
     /// Partitioning mode to use
     mode: PartitionMode,
-    /// Metrics
-    metrics: Arc<HashJoinMetrics>,
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
 }
 
 /// Metrics for HashJoinExec
 #[derive(Debug)]
 struct HashJoinMetrics {
     /// Total time for joining probe-side batches to the build-side batches
-    join_time: Arc<SQLMetric>,
+    join_time: metrics::Time,
     /// Number of batches consumed by this operator
-    input_batches: Arc<SQLMetric>,
+    input_batches: metrics::Count,
     /// Number of rows consumed by this operator
-    input_rows: Arc<SQLMetric>,
+    input_rows: metrics::Count,
     /// Number of batches produced by this operator
-    output_batches: Arc<SQLMetric>,
+    output_batches: metrics::Count,
     /// Number of rows produced by this operator
-    output_rows: Arc<SQLMetric>,
+    output_rows: metrics::Count,
 }
 
 impl HashJoinMetrics {
-    fn new() -> Self {
+    pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
+        let join_time = MetricBuilder::new(metrics).subset_time("join_time", partition);
+
+        let input_batches =
+            MetricBuilder::new(metrics).counter("input_batches", partition);
+
+        let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
+
+        let output_batches =
+            MetricBuilder::new(metrics).counter("output_rows", partition);
+
+        let output_rows = MetricBuilder::new(metrics).output_rows(partition);
+
         Self {
-            join_time: SQLMetric::time_nanos(),
-            input_batches: SQLMetric::counter(),
-            input_rows: SQLMetric::counter(),
-            output_batches: SQLMetric::counter(),
-            output_rows: SQLMetric::counter(),
+            join_time,
+            input_batches,
+            input_rows,
+            output_batches,
+            output_rows,
         }
     }
 }
@@ -187,7 +201,7 @@ impl HashJoinExec {
             build_side: Arc::new(Mutex::new(None)),
             random_state,
             mode: partition_mode,
-            metrics: Arc::new(HashJoinMetrics::new()),
+            metrics: ExecutionPlanMetricsSet::new(),
         })
     }
 
@@ -425,7 +439,7 @@ impl ExecutionPlan for HashJoinExec {
             column_indices,
             self.random_state.clone(),
             visited_left_side,
-            self.metrics.clone(),
+            HashJoinMetrics::new(partition, &self.metrics),
         )))
     }
 
@@ -445,20 +459,8 @@ impl ExecutionPlan for HashJoinExec {
         }
     }
 
-    fn metrics(&self) -> HashMap<String, SQLMetric> {
-        let mut metrics = HashMap::new();
-        metrics.insert("joinTime".to_owned(), (*self.metrics.join_time).clone());
-        metrics.insert(
-            "inputBatches".to_owned(),
-            (*self.metrics.input_batches).clone(),
-        );
-        metrics.insert("inputRows".to_owned(), (*self.metrics.input_rows).clone());
-        metrics.insert(
-            "outputBatches".to_owned(),
-            (*self.metrics.output_batches).clone(),
-        );
-        metrics.insert("outputRows".to_owned(), (*self.metrics.output_rows).clone());
-        metrics
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
     }
 }
 
@@ -522,7 +524,7 @@ struct HashJoinStream {
     /// There is nothing to process anymore and left side is processed in case of left join
     is_exhausted: bool,
     /// Metrics
-    metrics: Arc<HashJoinMetrics>,
+    join_metrics: HashJoinMetrics,
 }
 
 #[allow(clippy::too_many_arguments)]
@@ -537,7 +539,7 @@ impl HashJoinStream {
         column_indices: Vec<ColumnIndex>,
         random_state: RandomState,
         visited_left_side: Vec<bool>,
-        metrics: Arc<HashJoinMetrics>,
+        join_metrics: HashJoinMetrics,
     ) -> Self {
         HashJoinStream {
             schema,
@@ -550,7 +552,7 @@ impl HashJoinStream {
             random_state,
             visited_left_side,
             is_exhausted: false,
-            metrics,
+            join_metrics,
         }
     }
 }
@@ -876,7 +878,7 @@ impl Stream for HashJoinStream {
             .poll_next_unpin(cx)
             .map(|maybe_batch| match maybe_batch {
                 Some(Ok(batch)) => {
-                    let start = Instant::now();
+                    let timer = self.join_metrics.join_time.timer();
                     let result = build_batch(
                         &batch,
                         &self.left_data,
@@ -887,14 +889,12 @@ impl Stream for HashJoinStream {
                         &self.column_indices,
                         &self.random_state,
                     );
-                    self.metrics.input_batches.add(1);
-                    self.metrics.input_rows.add(batch.num_rows());
+                    self.join_metrics.input_batches.add(1);
+                    self.join_metrics.input_rows.add(batch.num_rows());
                     if let Ok((ref batch, ref left_side)) = result {
-                        self.metrics
-                            .join_time
-                            .add(start.elapsed().as_millis() as usize);
-                        self.metrics.output_batches.add(1);
-                        self.metrics.output_rows.add(batch.num_rows());
+                        timer.done();
+                        self.join_metrics.output_batches.add(1);
+                        self.join_metrics.output_rows.add(batch.num_rows());
 
                         match self.join_type {
                             JoinType::Left
@@ -911,7 +911,7 @@ impl Stream for HashJoinStream {
                     Some(result.map(|x| x.0))
                 }
                 other => {
-                    let start = Instant::now();
+                    let timer = self.join_metrics.join_time.timer();
                     // For the left join, produce rows for unmatched rows
                     match self.join_type {
                         JoinType::Left
@@ -928,16 +928,14 @@ impl Stream for HashJoinStream {
                                 self.join_type != JoinType::Semi,
                             );
                             if let Ok(ref batch) = result {
-                                self.metrics.input_batches.add(1);
-                                self.metrics.input_rows.add(batch.num_rows());
+                                self.join_metrics.input_batches.add(1);
+                                self.join_metrics.input_rows.add(batch.num_rows());
                                 if let Ok(ref batch) = result {
-                                    self.metrics
-                                        .join_time
-                                        .add(start.elapsed().as_millis() as usize);
-                                    self.metrics.output_batches.add(1);
-                                    self.metrics.output_rows.add(batch.num_rows());
+                                    self.join_metrics.output_batches.add(1);
+                                    self.join_metrics.output_rows.add(batch.num_rows());
                                 }
                             }
+                            timer.done();
                             self.is_exhausted = true;
                             return Some(result);
                         }
diff --git a/datafusion/src/physical_plan/metrics/builder.rs b/datafusion/src/physical_plan/metrics/builder.rs
new file mode 100644
index 0000000..34392c7
--- /dev/null
+++ b/datafusion/src/physical_plan/metrics/builder.rs
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Builder for creating arbitrary metrics
+
+use std::{borrow::Cow, sync::Arc};
+
+use super::{Count, ExecutionPlanMetricsSet, Label, Metric, MetricValue, Time};
+
+/// Structure for constructing metrics, counters, timers, etc.
+///
+/// Note the use of `Cow<..>` is to avoid allocations in the common
+/// case of constant strings
+///
+/// ```rust
+///  use datafusion::physical_plan::metrics::*;
+///
+///  let metrics = ExecutionPlanMetricsSet::new();
+///  let partition = 1;
+///
+///  // Create the standard output_rows metric
+///  let output_rows = MetricBuilder::new(&metrics).output_rows(partition);
+///
+///  // Create a operator specific counter with some labels
+///  let num_bytes = MetricBuilder::new(&metrics)
+///    .with_new_label("filename", "my_awesome_file.parquet")
+///    .counter("num_bytes", partition);
+///
+/// ```
+pub struct MetricBuilder<'a> {
+    /// Location that the metric created by this builder will be added do
+    metrics: &'a ExecutionPlanMetricsSet,
+
+    /// optional partition number
+    partition: Option<usize>,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+}
+
+impl<'a> MetricBuilder<'a> {
+    /// Create a new `MetricBuilder` that will register the result of `build()` with the `metrics`
+    pub fn new(metrics: &'a ExecutionPlanMetricsSet) -> Self {
+        Self {
+            metrics,
+            partition: None,
+            labels: vec![],
+        }
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_label(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// Add a label to the metric being constructed
+    pub fn with_new_label(
+        self,
+        name: impl Into<Cow<'static, str>>,
+        value: impl Into<Cow<'static, str>>,
+    ) -> Self {
+        self.with_label(Label::new(name.into(), value.into()))
+    }
+
+    /// Set the partition of the metric being constructed
+    pub fn with_partition(mut self, partition: usize) -> Self {
+        self.partition = Some(partition);
+        self
+    }
+
+    /// Consume self and create a metric of the specified value
+    /// registered with the MetricsSet
+    pub fn build(self, value: MetricValue) {
+        let Self {
+            labels,
+            partition,
+            metrics,
+        } = self;
+        let metric = Arc::new(Metric::new_with_labels(value, partition, labels));
+        metrics.register(metric);
+    }
+
+    /// Consume self and create a new counter for recording output rows
+    pub fn output_rows(self, partition: usize) -> Count {
+        let count = Count::new();
+        self.with_partition(partition)
+            .build(MetricValue::OutputRows(count.clone()));
+        count
+    }
+
+    /// Consumes self and creates a new [`Count`] for recording some
+    /// arbitrary metric of an operator.
+    pub fn counter(
+        self,
+        counter_name: impl Into<Cow<'static, str>>,
+        partition: usize,
+    ) -> Count {
+        self.with_partition(partition).global_counter(counter_name)
+    }
+
+    /// Consumes self and creates a new [`Count`] for recording a
+    /// metric of an overall operator (not per partition)
+    pub fn global_counter(self, counter_name: impl Into<Cow<'static, str>>) -> Count {
+        let count = Count::new();
+        self.build(MetricValue::Count {
+            name: counter_name.into(),
+            count: count.clone(),
+        });
+        count
+    }
+
+    /// Consume self and create a new Timer for recording the overall cpu time
+    /// spent by an operator
+    pub fn cpu_time(self, partition: usize) -> Time {
+        let time = Time::new();
+        self.with_partition(partition)
+            .build(MetricValue::CPUTime(time.clone()));
+        time
+    }
+
+    /// Consumes self and creates a new Timer for recording some
+    /// subset of of an operators execution time.
+    pub fn subset_time(
+        self,
+        subset_name: impl Into<Cow<'static, str>>,
+        partition: usize,
+    ) -> Time {
+        let time = Time::new();
+        self.with_partition(partition).build(MetricValue::Time {
+            name: subset_name.into(),
+            time: time.clone(),
+        });
+        time
+    }
+}
diff --git a/datafusion/src/physical_plan/metrics/mod.rs b/datafusion/src/physical_plan/metrics/mod.rs
new file mode 100644
index 0000000..7dd92a4
--- /dev/null
+++ b/datafusion/src/physical_plan/metrics/mod.rs
@@ -0,0 +1,528 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metrics for recording information about execution
+
+mod builder;
+mod value;
+
+use std::{
+    borrow::Cow,
+    fmt::{Debug, Display},
+    sync::{Arc, Mutex},
+};
+
+use hashbrown::HashMap;
+
+// public exports
+pub use builder::MetricBuilder;
+pub use value::{Count, MetricValue, ScopedTimerGuard, Time};
+
+/// Something that tracks a value of interest (metric) of a DataFusion
+/// [`ExecutionPlan`] execution.
+///
+/// Typically [`Metric`]s are not created directly, but instead
+/// are created using [`MetricBuilder`] or methods on
+/// [`ExecutionPlanMetricsSet`].
+///
+/// ```
+///  use datafusion::physical_plan::metrics::*;
+///
+///  let metrics = ExecutionPlanMetricsSet::new();
+///  assert!(metrics.clone_inner().output_rows().is_none());
+///
+///  // Create a counter to increment using the MetricBuilder
+///  let partition = 1;
+///  let output_rows = MetricBuilder::new(&metrics)
+///      .output_rows(partition);
+///
+///  // Counter can be incremented
+///  output_rows.add(13);
+///
+///  // The value can be retrieved directly:
+///  assert_eq!(output_rows.value(), 13);
+///
+///  // As well as from the metrics set
+///  assert_eq!(metrics.clone_inner().output_rows(), Some(13));
+/// ```
+
+#[derive(Debug)]
+pub struct Metric {
+    /// The value the metric
+    value: MetricValue,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+
+    /// To which partition of an operators output did this metric
+    /// apply? If None means all partitions.
+    partition: Option<usize>,
+}
+
+impl Display for Metric {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.value.name())?;
+
+        let mut iter = self
+            .partition
+            .iter()
+            .map(|partition| Label::new("partition", partition.to_string()))
+            .chain(self.labels().iter().cloned())
+            .peekable();
+
+        // print out the labels specially
+        if iter.peek().is_some() {
+            write!(f, "{{")?;
+
+            let mut is_first = true;
+            for i in iter {
+                if !is_first {
+                    write!(f, ", ")?;
+                } else {
+                    is_first = false;
+                }
+
+                write!(f, "{}", i)?;
+            }
+
+            write!(f, "}}")?;
+        }
+
+        // and now the value
+        write!(f, "={}", self.value)
+    }
+}
+
+impl Metric {
+    /// Create a new [`Metric`]. Consider using [`MetricBuilder`]
+    /// rather than this function directly.
+    pub fn new(value: MetricValue, partition: Option<usize>) -> Self {
+        Self {
+            value,
+            labels: vec![],
+            partition,
+        }
+    }
+
+    /// Create a new [`Metric`]. Consider using [`MetricBuilder`]
+    /// rather than this function directly.
+    pub fn new_with_labels(
+        value: MetricValue,
+        partition: Option<usize>,
+        labels: Vec<Label>,
+    ) -> Self {
+        Self {
+            value,
+            labels,
+            partition,
+        }
+    }
+
+    /// Add a new label to this metric
+    pub fn with(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// What labels are present for this metric?
+    fn labels(&self) -> &[Label] {
+        &self.labels
+    }
+
+    /// return a reference to the value of this metric
+    pub fn value(&self) -> &MetricValue {
+        &self.value
+    }
+
+    /// return a mutable reference to the value of this metric
+    pub fn value_mut(&mut self) -> &mut MetricValue {
+        &mut self.value
+    }
+
+    /// return a reference to the partition
+    pub fn partition(&self) -> &Option<usize> {
+        &self.partition
+    }
+}
+
+/// A snapshot of the metrics for a particular operator (`dyn
+/// ExecutionPlan`).
+#[derive(Default, Debug, Clone)]
+pub struct MetricsSet {
+    metrics: Vec<Arc<Metric>>,
+}
+
+impl MetricsSet {
+    /// Create a new container of metrics
+    pub fn new() -> Self {
+        Default::default()
+    }
+
+    /// Add the specified metric
+    pub fn push(&mut self, metric: Arc<Metric>) {
+        self.metrics.push(metric)
+    }
+
+    /// Returns an interator across all metrics
+    pub fn iter(&self) -> impl Iterator<Item = &Arc<Metric>> {
+        self.metrics.iter()
+    }
+
+    /// convenience: return the number of rows produced, aggregated
+    /// across partitions or None if no metric is present
+    pub fn output_rows(&self) -> Option<usize> {
+        self.sum(|metric| matches!(metric.value(), MetricValue::OutputRows(_)))
+            .map(|v| v.as_usize())
+    }
+
+    /// convenience: return the amount of CPU time spent, aggregated
+    /// across partitions or None if no metric is present
+    pub fn cpu_time(&self) -> Option<usize> {
+        self.sum(|metric| matches!(metric.value(), MetricValue::CPUTime(_)))
+            .map(|v| v.as_usize())
+    }
+
+    /// Sums the values for metrics for which `f(metric)` returns
+    /// true, and returns the value. Returns None if no metrics match
+    /// the predicate.
+    pub fn sum<F>(&self, mut f: F) -> Option<MetricValue>
+    where
+        F: FnMut(&Metric) -> bool,
+    {
+        let mut iter = self
+            .metrics
+            .iter()
+            .filter(|metric| f(metric.as_ref()))
+            .peekable();
+
+        let mut accum = match iter.peek() {
+            None => {
+                return None;
+            }
+            Some(metric) => metric.value().new_empty(),
+        };
+
+        iter.for_each(|metric| accum.add(metric.value()));
+
+        Some(accum)
+    }
+
+    /// Returns returns a new derived `MetricsSet` where all metrics
+    /// that had the same name and partition=`Some(..)` have been
+    /// aggregated together. The resulting `MetricsSet` has all
+    /// metrics with `Partition=None`
+    pub fn aggregate_by_partition(&self) -> Self {
+        let mut map = HashMap::new();
+
+        // There are all sorts of ways to make this more efficient
+        for metric in &self.metrics {
+            let key = (metric.value.name(), metric.labels.clone());
+            map.entry(key)
+                .and_modify(|accum: &mut Metric| {
+                    accum.value_mut().add(metric.value());
+                })
+                .or_insert_with(|| {
+                    // accumulate with no partition
+                    let partition = None;
+                    let mut accum = Metric::new_with_labels(
+                        metric.value().new_empty(),
+                        partition,
+                        metric.labels().to_vec(),
+                    );
+                    accum.value_mut().add(metric.value());
+                    accum
+                });
+        }
+
+        let new_metrics = map
+            .into_iter()
+            .map(|(_k, v)| Arc::new(v))
+            .collect::<Vec<_>>();
+
+        Self {
+            metrics: new_metrics,
+        }
+    }
+}
+
+impl Display for MetricsSet {
+    /// format the MetricsSet as a single string
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let mut is_first = true;
+        for i in self.metrics.iter() {
+            if !is_first {
+                write!(f, ", ")?;
+            } else {
+                is_first = false;
+            }
+
+            write!(f, "{}", i)?;
+        }
+        Ok(())
+    }
+}
+
+/// A set of [`Metric`] for an individual "operator" (e.g. `&dyn
+/// ExecutionPlan`).
+///
+/// This structure is intended as a convenience for [`ExecutionPlan`]
+/// implementations so they can generate different streams for multiple
+/// partitions but easily report them together.
+///
+/// Each `clone()` of this structure will add metrics to the same
+/// underlying metrics set
+#[derive(Default, Debug, Clone)]
+pub struct ExecutionPlanMetricsSet {
+    inner: Arc<Mutex<MetricsSet>>,
+}
+
+impl ExecutionPlanMetricsSet {
+    /// Create a new empty shared metrics set
+    pub fn new() -> Self {
+        Self {
+            inner: Arc::new(Mutex::new(MetricsSet::new())),
+        }
+    }
+
+    /// Add the specified metric to the underlying metric set
+    pub fn register(&self, metric: Arc<Metric>) {
+        self.inner.lock().expect("not poisoned").push(metric)
+    }
+
+    /// Return a clone of the inner MetricsSet
+    pub fn clone_inner(&self) -> MetricsSet {
+        let guard = self.inner.lock().expect("not poisoned");
+        (*guard).clone()
+    }
+}
+
+/// name=value pairs identifiying a metric. This concept is called various things
+/// in various different systems:
+///
+/// "labels" in
+/// [prometheus](https://prometheus.io/docs/concepts/data_model/) and
+/// "tags" in
+/// [InfluxDB](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/)
+/// , "attributes" in [open
+/// telemetry](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/datamodel.md],
+/// etc.
+///
+/// As the name and value are expected to mostly be constant strings,
+/// use a `Cow` to avoid copying / allocations in this common case.
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct Label {
+    name: Cow<'static, str>,
+    value: Cow<'static, str>,
+}
+
+impl Label {
+    /// Create a new Label
+    pub fn new(
+        name: impl Into<Cow<'static, str>>,
+        value: impl Into<Cow<'static, str>>,
+    ) -> Self {
+        let name = name.into();
+        let value = value.into();
+        Self { name, value }
+    }
+}
+
+impl Display for Label {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}={}", self.name, self.value)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::time::Duration;
+
+    use super::*;
+
+    #[test]
+    fn test_display_no_labels_no_partition() {
+        let count = Count::new();
+        count.add(33);
+        let value = MetricValue::OutputRows(count);
+        let partition = None;
+        let metric = Metric::new(value, partition);
+
+        assert_eq!("output_rows=33", metric.to_string())
+    }
+
+    #[test]
+    fn test_display_no_labels_with_partition() {
+        let count = Count::new();
+        count.add(44);
+        let value = MetricValue::OutputRows(count);
+        let partition = Some(1);
+        let metric = Metric::new(value, partition);
+
+        assert_eq!("output_rows{partition=1}=44", metric.to_string())
+    }
+
+    #[test]
+    fn test_display_labels_no_partition() {
+        let count = Count::new();
+        count.add(55);
+        let value = MetricValue::OutputRows(count);
+        let partition = None;
+        let label = Label::new("foo", "bar");
+        let metric = Metric::new_with_labels(value, partition, vec![label]);
+
+        assert_eq!("output_rows{foo=bar}=55", metric.to_string())
+    }
+
+    #[test]
+    fn test_display_labels_and_partition() {
+        let count = Count::new();
+        count.add(66);
+        let value = MetricValue::OutputRows(count);
+        let partition = Some(2);
+        let label = Label::new("foo", "bar");
+        let metric = Metric::new_with_labels(value, partition, vec![label]);
+
+        assert_eq!("output_rows{partition=2, foo=bar}=66", metric.to_string())
+    }
+
+    #[test]
+    fn test_output_rows() {
+        let metrics = ExecutionPlanMetricsSet::new();
+        assert!(metrics.clone_inner().output_rows().is_none());
+
+        let partition = 1;
+        let output_rows = MetricBuilder::new(&metrics).output_rows(partition);
+        output_rows.add(13);
+
+        let output_rows = MetricBuilder::new(&metrics).output_rows(partition + 1);
+        output_rows.add(7);
+        assert_eq!(metrics.clone_inner().output_rows().unwrap(), 20);
+    }
+
+    #[test]
+    fn test_cpu_time() {
+        let metrics = ExecutionPlanMetricsSet::new();
+        assert!(metrics.clone_inner().cpu_time().is_none());
+
+        let partition = 1;
+        let cpu_time = MetricBuilder::new(&metrics).cpu_time(partition);
+        cpu_time.add_duration(Duration::from_nanos(1234));
+
+        let cpu_time = MetricBuilder::new(&metrics).cpu_time(partition + 1);
+        cpu_time.add_duration(Duration::from_nanos(6));
+        assert_eq!(metrics.clone_inner().cpu_time().unwrap(), 1240);
+    }
+
+    #[test]
+    fn test_sum() {
+        let metrics = ExecutionPlanMetricsSet::new();
+
+        let count1 = MetricBuilder::new(&metrics)
+            .with_new_label("foo", "bar")
+            .counter("my_counter", 1);
+        count1.add(1);
+
+        let count2 = MetricBuilder::new(&metrics).counter("my_counter", 2);
+        count2.add(2);
+
+        let metrics = metrics.clone_inner();
+        assert!(metrics.sum(|_| false).is_none());
+
+        let expected_count = Count::new();
+        expected_count.add(3);
+        let expected_sum = MetricValue::Count {
+            name: "my_counter".into(),
+            count: expected_count,
+        };
+
+        assert_eq!(metrics.sum(|_| true), Some(expected_sum));
+    }
+
+    #[test]
+    #[should_panic(expected = "Mismatched metric types. Can not aggregate Count")]
+    fn test_bad_sum() {
+        // can not add different kinds of metrics
+        let metrics = ExecutionPlanMetricsSet::new();
+
+        let count = MetricBuilder::new(&metrics).counter("my_metric", 1);
+        count.add(1);
+
+        let time = MetricBuilder::new(&metrics).subset_time("my_metric", 1);
+        time.add_duration(Duration::from_nanos(10));
+
+        // expect that this will error out
+        metrics.clone_inner().sum(|_| true);
+    }
+
+    #[test]
+    fn test_aggregate_partition() {
+        let metrics = ExecutionPlanMetricsSet::new();
+
+        // Note cpu_time1 has labels so it is not aggregated with 2 and 3
+        let cpu_time1 = MetricBuilder::new(&metrics)
+            .with_new_label("foo", "bar")
+            .cpu_time(1);
+        cpu_time1.add_duration(Duration::from_nanos(12));
+
+        let cpu_time2 = MetricBuilder::new(&metrics).cpu_time(2);
+        cpu_time2.add_duration(Duration::from_nanos(34));
+
+        let cpu_time3 = MetricBuilder::new(&metrics).cpu_time(4);
+        cpu_time3.add_duration(Duration::from_nanos(56));
+
+        let output_rows = MetricBuilder::new(&metrics).output_rows(1); // output rows
+        output_rows.add(56);
+
+        let aggregated = metrics.clone_inner().aggregate_by_partition();
+
+        // cpu time should be aggregated:
+        let cpu_times = aggregated
+            .iter()
+            .filter(|metric| {
+                matches!(metric.value(), MetricValue::CPUTime(_))
+                    && metric.labels().is_empty()
+            })
+            .collect::<Vec<_>>();
+        assert_eq!(cpu_times.len(), 1);
+        assert_eq!(cpu_times[0].value().as_usize(), 34 + 56);
+        assert!(cpu_times[0].partition().is_none());
+
+        // output rows should
+        let output_rows = aggregated
+            .iter()
+            .filter(|metric| matches!(metric.value(), MetricValue::OutputRows(_)))
+            .collect::<Vec<_>>();
+        assert_eq!(output_rows.len(), 1);
+        assert_eq!(output_rows[0].value().as_usize(), 56);
+        assert!(output_rows[0].partition.is_none())
+    }
+
+    #[test]
+    #[should_panic(expected = "Mismatched metric types. Can not aggregate Count")]
+    fn test_aggregate_partition_bad_sum() {
+        let metrics = ExecutionPlanMetricsSet::new();
+
+        let count = MetricBuilder::new(&metrics).counter("my_metric", 1);
+        count.add(1);
+
+        let time = MetricBuilder::new(&metrics).subset_time("my_metric", 1);
+        time.add_duration(Duration::from_nanos(10));
+
+        // can't aggregate time and count -- expect a panic
+        metrics.clone_inner().aggregate_by_partition();
+    }
+}
diff --git a/datafusion/src/physical_plan/metrics/value.rs b/datafusion/src/physical_plan/metrics/value.rs
new file mode 100644
index 0000000..61a7c2b
--- /dev/null
+++ b/datafusion/src/physical_plan/metrics/value.rs
@@ -0,0 +1,261 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Value representation of metrics
+
+use std::{
+    borrow::{Borrow, Cow},
+    sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc,
+    },
+    time::{Duration, Instant},
+};
+
+/// A counter to record things such as number of input or output rows
+///
+/// Note `clone`ing counters update the same underlying metrics
+#[derive(Debug, Clone)]
+pub struct Count {
+    /// value of the metric counter
+    value: std::sync::Arc<AtomicUsize>,
+}
+
+impl PartialEq for Count {
+    fn eq(&self, other: &Self) -> bool {
+        self.value().eq(&other.value())
+    }
+}
+
+impl Count {
+    /// create a new counter
+    pub fn new() -> Self {
+        Self {
+            value: Arc::new(AtomicUsize::new(0)),
+        }
+    }
+
+    /// Add `n` to the metric's value
+    pub fn add(&self, n: usize) {
+        // relaxed ordering for operations on `value` poses no issues
+        // we're purely using atomic ops with no associated memory ops
+        self.value.fetch_add(n, Ordering::Relaxed);
+    }
+
+    /// Get the current value
+    pub fn value(&self) -> usize {
+        self.value.load(Ordering::Relaxed)
+    }
+}
+
+/// Measure a potentially non contiguous duration of time
+#[derive(Debug, Clone)]
+pub struct Time {
+    /// elapsed time, in nanoseconds
+    nanos: Arc<AtomicUsize>,
+}
+
+impl PartialEq for Time {
+    fn eq(&self, other: &Self) -> bool {
+        self.value().eq(&other.value())
+    }
+}
+
+impl Time {
+    /// Create a new [`Time`] wrapper suitable for recording elapsed
+    /// times for operations.
+    pub fn new() -> Self {
+        Self {
+            nanos: Arc::new(AtomicUsize::new(0)),
+        }
+    }
+
+    /// Add elapsed nanoseconds since `start`to self
+    pub fn add_elapsed(&self, start: Instant) {
+        self.add_duration(start.elapsed());
+    }
+
+    /// Add duration of time to self
+    pub fn add_duration(&self, duration: Duration) {
+        let more_nanos = duration.as_nanos() as usize;
+        self.nanos.fetch_add(more_nanos, Ordering::Relaxed);
+    }
+
+    /// Add the number of nanoseconds of other `Time` to self
+    pub fn add(&self, other: &Time) {
+        self.nanos.fetch_add(other.value(), Ordering::Relaxed);
+    }
+
+    /// return a scoped guard that adds the amount of time elapsed
+    /// between its creation and its drop or call to `stop` to the
+    /// underlying metric.
+    pub fn timer(&self) -> ScopedTimerGuard<'_> {
+        ScopedTimerGuard {
+            inner: self,
+            start: Some(Instant::now()),
+        }
+    }
+
+    /// Get the number of nanoseconds record by this Time metric
+    pub fn value(&self) -> usize {
+        self.nanos.load(Ordering::Relaxed)
+    }
+}
+
+/// RAAI structure that adds all time between its construction and
+/// destruction to the CPU time or the first call to `stop` whichever
+/// comes first
+pub struct ScopedTimerGuard<'a> {
+    inner: &'a Time,
+    start: Option<Instant>,
+}
+
+impl<'a> ScopedTimerGuard<'a> {
+    /// Stop the timer timing and record the time taken
+    pub fn stop(&mut self) {
+        if let Some(start) = self.start.take() {
+            self.inner.add_elapsed(start)
+        }
+    }
+
+    /// Stop the timer, record the time taken and consume self
+    pub fn done(mut self) {
+        self.stop()
+    }
+}
+
+impl<'a> Drop for ScopedTimerGuard<'a> {
+    fn drop(&mut self) {
+        self.stop()
+    }
+}
+
+/// Possible values for a metric.
+///
+/// Among other differences, the metric types have different ways to
+/// logically interpret their underlying values and some metrics are
+/// so common they are given special treatment.
+#[derive(Debug, Clone, PartialEq)]
+pub enum MetricValue {
+    /// Number of output rows produced: "output_rows" metric
+    OutputRows(Count),
+    /// CPU time: the "cpu_time" metric
+    CPUTime(Time),
+    /// Operator defined count.
+    Count {
+        /// The provided name of this metric
+        name: Cow<'static, str>,
+        /// The value of the metric
+        count: Count,
+    },
+    /// Operator defined time
+    Time {
+        /// The provided name of this metric
+        name: Cow<'static, str>,
+        /// The value of the metric
+        time: Time,
+    },
+    // TODO timestamp, etc
+    // https://github.com/apache/arrow-datafusion/issues/866
+}
+
+impl MetricValue {
+    /// Return the name of this SQL metric
+    pub fn name(&self) -> &str {
+        match self {
+            Self::OutputRows(_) => "output_rows",
+            Self::CPUTime(_) => "cpu_time",
+            Self::Count { name, .. } => name.borrow(),
+            Self::Time { name, .. } => name.borrow(),
+        }
+    }
+
+    /// Return the value of the metric as a usize value
+    pub fn as_usize(&self) -> usize {
+        match self {
+            Self::OutputRows(count) => count.value(),
+            Self::CPUTime(time) => time.value(),
+            Self::Count { count, .. } => count.value(),
+            Self::Time { time, .. } => time.value(),
+        }
+    }
+
+    /// create a new MetricValue with the same type as `self` suitable
+    /// for accumulating
+    pub fn new_empty(&self) -> Self {
+        match self {
+            Self::OutputRows(_) => Self::OutputRows(Count::new()),
+            Self::CPUTime(_) => Self::CPUTime(Time::new()),
+            Self::Count { name, .. } => Self::Count {
+                name: name.clone(),
+                count: Count::new(),
+            },
+            Self::Time { name, .. } => Self::Time {
+                name: name.clone(),
+                time: Time::new(),
+            },
+        }
+    }
+
+    /// Add the value of other to `self`. panic's if the type is mismatched or
+    /// aggregating does not make sense for this value
+    ///
+    /// Note this is purposely marked `mut` (even though atomics are
+    /// used) so Rust's type system can be used to ensure the
+    /// appropriate API access. `MetricValues` should be modified
+    /// using the original [`Count`] or [`Time`] they were created
+    /// from.
+    pub fn add(&mut self, other: &Self) {
+        match (self, other) {
+            (Self::OutputRows(count), Self::OutputRows(other_count))
+            | (
+                Self::Count { count, .. },
+                Self::Count {
+                    count: other_count, ..
+                },
+            ) => count.add(other_count.value()),
+            (Self::CPUTime(time), Self::CPUTime(other_time))
+            | (
+                Self::Time { time, .. },
+                Self::Time {
+                    time: other_time, ..
+                },
+            ) => time.add(other_time),
+            m @ (_, _) => {
+                panic!(
+                    "Mismatched metric types. Can not aggregate {:?} with value {:?}",
+                    m.0, m.1
+                )
+            }
+        }
+    }
+}
+
+impl std::fmt::Display for MetricValue {
+    /// Prints the value of this metric
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Self::OutputRows(count) | Self::Count { count, .. } => {
+                write!(f, "{}", count.value())
+            }
+            Self::CPUTime(time) | Self::Time { time, .. } => {
+                let duration = std::time::Duration::from_nanos(time.value() as u64);
+                write!(f, "{:?}", duration)
+            }
+        }
+    }
+}
diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs
index 8f7db72..82c297c 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -17,14 +17,8 @@
 
 //! Traits for physical query plan, supporting parallel execution for partitioned relations.
 
-use std::fmt;
-use std::fmt::{Debug, Display};
-use std::ops::Range;
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::Arc;
-use std::task::{Context, Poll};
-use std::{any::Any, pin::Pin};
-
+pub use self::metrics::Metric;
+use self::metrics::MetricsSet;
 use self::{
     coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan,
 };
@@ -42,7 +36,12 @@ use arrow::{array::ArrayRef, datatypes::Field};
 use async_trait::async_trait;
 pub use display::DisplayFormatType;
 use futures::stream::Stream;
-use hashbrown::HashMap;
+use std::fmt;
+use std::fmt::{Debug, Display};
+use std::ops::Range;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{any::Any, pin::Pin};
 
 /// Trait for types that stream [arrow::record_batch::RecordBatch]
 pub trait RecordBatchStream: Stream<Item = ArrowResult<RecordBatch>> {
@@ -87,72 +86,6 @@ impl Stream for EmptyRecordBatchStream {
     }
 }
 
-/// SQL metric type
-#[derive(Debug, Clone)]
-pub enum MetricType {
-    /// Simple counter
-    Counter,
-    /// Wall clock time in nanoseconds
-    TimeNanos,
-}
-
-/// SQL metric such as counter (number of input or output rows) or timing information about
-/// a physical operator.
-#[derive(Debug)]
-pub struct SQLMetric {
-    /// Metric value
-    value: AtomicUsize,
-    /// Metric type
-    metric_type: MetricType,
-}
-
-impl Clone for SQLMetric {
-    fn clone(&self) -> Self {
-        Self {
-            value: AtomicUsize::new(self.value.load(Ordering::Relaxed)),
-            metric_type: self.metric_type.clone(),
-        }
-    }
-}
-
-impl SQLMetric {
-    // relaxed ordering for operations on `value` poses no issues
-    // we're purely using atomic ops with no associated memory ops
-
-    /// Create a new metric for tracking a counter
-    pub fn counter() -> Arc<SQLMetric> {
-        Arc::new(SQLMetric::new(MetricType::Counter))
-    }
-
-    /// Create a new metric for tracking time in nanoseconds
-    pub fn time_nanos() -> Arc<SQLMetric> {
-        Arc::new(SQLMetric::new(MetricType::TimeNanos))
-    }
-
-    /// Create a new SQLMetric
-    pub fn new(metric_type: MetricType) -> Self {
-        Self {
-            value: AtomicUsize::new(0),
-            metric_type,
-        }
-    }
-
-    /// Add to the value
-    pub fn add(&self, n: usize) {
-        self.value.fetch_add(n, Ordering::Relaxed);
-    }
-
-    /// Add elapsed nanoseconds since `start`to self
-    pub fn add_elapsed(&self, start: std::time::Instant) {
-        self.add(start.elapsed().as_nanos() as usize)
-    }
-
-    /// Get the current value
-    pub fn value(&self) -> usize {
-        self.value.load(Ordering::Relaxed)
-    }
-}
-
 /// Physical planner interface
 pub use self::planner::PhysicalPlanner;
 
@@ -193,9 +126,19 @@ pub trait ExecutionPlan: Debug + Send + Sync {
     /// creates an iterator
     async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream>;
 
-    /// Return a snapshot of the metrics collected during execution
-    fn metrics(&self) -> HashMap<String, SQLMetric> {
-        HashMap::new()
+    /// Return a snapshot of the set of [`Metric`]s for this
+    /// [`ExecutionPlan`].
+    ///
+    /// While the values of the metrics in the returned
+    /// [`MetricsSet`]s may change as execution progresses, the
+    /// specific metrics will not.
+    ///
+    /// Once `self.execute()` has returned (technically the future is
+    /// resolved) for all available partitions, the set of metrics
+    /// should be complete. If this function is called prior to
+    /// `execute()` new metrics may appear in subsequent calls.
+    fn metrics(&self) -> Option<MetricsSet> {
+        None
     }
 
     /// Format this `ExecutionPlan` to `f` in the specified type.
@@ -330,20 +273,6 @@ pub fn visit_execution_plan<V: ExecutionPlanVisitor>(
     Ok(())
 }
 
-/// Recursively gateher all execution metrics from this plan and all of its input plans
-pub fn plan_metrics(plan: Arc<dyn ExecutionPlan>) -> HashMap<String, SQLMetric> {
-    fn get_metrics_inner(
-        plan: &dyn ExecutionPlan,
-        mut metrics: HashMap<String, SQLMetric>,
-    ) -> HashMap<String, SQLMetric> {
-        metrics.extend(plan.metrics().into_iter());
-        plan.children().into_iter().fold(metrics, |metrics, child| {
-            get_metrics_inner(child.as_ref(), metrics)
-        })
-    }
-    get_metrics_inner(plan.as_ref(), HashMap::new())
-}
-
 /// Execute the [ExecutionPlan] and collect the results in memory
 pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
     let stream = execute_stream(plan).await?;
@@ -663,6 +592,7 @@ pub mod json;
 pub mod limit;
 pub mod math_expressions;
 pub mod memory;
+pub mod metrics;
 pub mod parquet;
 pub mod planner;
 pub mod projection;
diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs
index ff8bb5b..f85495c 100644
--- a/datafusion/src/physical_plan/parquet.rs
+++ b/datafusion/src/physical_plan/parquet.rs
@@ -38,7 +38,6 @@ use arrow::{
     error::{ArrowError, Result as ArrowResult},
     record_batch::RecordBatch,
 };
-use hashbrown::HashMap;
 use log::debug;
 use parquet::file::{
     metadata::RowGroupMetaData,
@@ -57,8 +56,8 @@ use tokio::{
 use crate::datasource::datasource::{ColumnStatistics, Statistics};
 use async_trait::async_trait;
 
+use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
 use super::stream::RecordBatchReceiverStream;
-use super::SQLMetric;
 use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
 use crate::physical_plan::Accumulator;
 
@@ -75,8 +74,8 @@ pub struct ParquetExec {
     batch_size: usize,
     /// Statistics for the data set (sum of statistics for all partitions)
     statistics: Statistics,
-    /// metrics for the overall execution
-    metrics: ParquetExecMetrics,
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
     /// Optional predicate builder
     predicate_builder: Option<PruningPredicate>,
     /// Optional limit of the number of rows
@@ -99,23 +98,16 @@ pub struct ParquetPartition {
     /// Statistics for this partition
     pub statistics: Statistics,
     /// Execution metrics
-    metrics: ParquetPartitionMetrics,
-}
-
-/// Stores metrics about the overall parquet execution
-#[derive(Debug, Clone)]
-pub struct ParquetExecMetrics {
-    /// Numer of times the pruning predicate could not be created
-    pub predicate_creation_errors: Arc<SQLMetric>,
+    metrics: ExecutionPlanMetricsSet,
 }
 
-/// Stores metrics about the parquet execution for a particular ParquetPartition
+/// Stores metrics about the parquet execution for a particular parquet file
 #[derive(Debug, Clone)]
-struct ParquetPartitionMetrics {
+struct ParquetFileMetrics {
     /// Numer of times the predicate could not be evaluated
-    pub predicate_evaluation_errors: Arc<SQLMetric>,
+    pub predicate_evaluation_errors: metrics::Count,
     /// Number of row groups pruned using
-    pub row_groups_pruned: Arc<SQLMetric>,
+    pub row_groups_pruned: metrics::Count,
 }
 
 impl ParquetExec {
@@ -167,6 +159,7 @@ impl ParquetExec {
                filenames, projection, predicate, limit);
         // build a list of Parquet partitions with statistics and gather all unique schemas
         // used in this data set
+        let metrics = ExecutionPlanMetricsSet::new();
         let mut schemas: Vec<Schema> = vec![];
         let mut partitions = Vec::with_capacity(max_concurrency);
         let filenames: Vec<String> = filenames.iter().map(|s| s.to_string()).collect();
@@ -393,7 +386,11 @@ impl ParquetExec {
             };
             // remove files that are not needed in case of limit
             filenames.truncate(total_files);
-            partitions.push(ParquetPartition::new(filenames, statistics));
+            partitions.push(ParquetPartition::new(
+                filenames,
+                statistics,
+                metrics.clone(),
+            ));
             if limit_exhausted {
                 break;
             }
@@ -410,7 +407,10 @@ impl ParquetExec {
             )));
         }
         let schema = Arc::new(schemas.pop().unwrap());
-        let metrics = ParquetExecMetrics::new();
+
+        let metrics = ExecutionPlanMetricsSet::new();
+        let predicate_creation_errors =
+            MetricBuilder::new(&metrics).global_counter("num_predicate_creation_errors");
 
         let predicate_builder = predicate.and_then(|predicate_expr| {
             match PruningPredicate::try_new(&predicate_expr, schema.clone()) {
@@ -420,7 +420,7 @@ impl ParquetExec {
                         "Could not create pruning predicate for {:?}: {}",
                         predicate_expr, e
                     );
-                    metrics.predicate_creation_errors.add(1);
+                    predicate_creation_errors.add(1);
                     None
                 }
             }
@@ -442,7 +442,7 @@ impl ParquetExec {
         partitions: Vec<ParquetPartition>,
         schema: SchemaRef,
         projection: Option<Vec<usize>>,
-        metrics: ParquetExecMetrics,
+        metrics: ExecutionPlanMetricsSet,
         predicate_builder: Option<PruningPredicate>,
         batch_size: usize,
         limit: Option<usize>,
@@ -582,11 +582,15 @@ impl ParquetExec {
 
 impl ParquetPartition {
     /// Create a new parquet partition
-    pub fn new(filenames: Vec<String>, statistics: Statistics) -> Self {
+    pub fn new(
+        filenames: Vec<String>,
+        statistics: Statistics,
+        metrics: ExecutionPlanMetricsSet,
+    ) -> Self {
         Self {
             filenames,
             statistics,
-            metrics: ParquetPartitionMetrics::new(),
+            metrics,
         }
     }
 
@@ -601,21 +605,24 @@ impl ParquetPartition {
     }
 }
 
-impl ParquetExecMetrics {
+impl ParquetFileMetrics {
     /// Create new metrics
-    pub fn new() -> Self {
-        Self {
-            predicate_creation_errors: SQLMetric::counter(),
-        }
-    }
-}
+    pub fn new(
+        partition: usize,
+        filename: &str,
+        metrics: &ExecutionPlanMetricsSet,
+    ) -> Self {
+        let predicate_evaluation_errors = MetricBuilder::new(metrics)
+            .with_new_label("filename", filename.to_string())
+            .counter("predicate_evaluation_errors", partition);
+
+        let row_groups_pruned = MetricBuilder::new(metrics)
+            .with_new_label("filename", filename.to_string())
+            .counter("row_groups_pruned", partition);
 
-impl ParquetPartitionMetrics {
-    /// Create new metrics
-    pub fn new() -> Self {
         Self {
-            predicate_evaluation_errors: SQLMetric::counter(),
-            row_groups_pruned: SQLMetric::counter(),
+            predicate_evaluation_errors,
+            row_groups_pruned,
         }
     }
 }
@@ -663,9 +670,9 @@ impl ExecutionPlan for ParquetExec {
             Receiver<ArrowResult<RecordBatch>>,
         ) = channel(2);
 
-        let partition = &self.partitions[partition];
-        let filenames = partition.filenames.clone();
-        let metrics = partition.metrics.clone();
+        let parquet_partition = &self.partitions[partition];
+        let filenames = parquet_partition.filenames.clone();
+        let metrics = self.metrics.clone();
         let projection = self.projection.clone();
         let predicate_builder = self.predicate_builder.clone();
         let batch_size = self.batch_size;
@@ -673,6 +680,7 @@ impl ExecutionPlan for ParquetExec {
 
         task::spawn_blocking(move || {
             if let Err(e) = read_files(
+                partition,
                 &filenames,
                 metrics,
                 &projection,
@@ -714,29 +722,8 @@ impl ExecutionPlan for ParquetExec {
         }
     }
 
-    fn metrics(&self) -> HashMap<String, SQLMetric> {
-        self.partitions
-            .iter()
-            .flat_map(|p| {
-                vec![
-                    (
-                        format!(
-                            "numPredicateEvaluationErrors for {}",
-                            p.filenames.join(",")
-                        ),
-                        p.metrics.predicate_evaluation_errors.as_ref().clone(),
-                    ),
-                    (
-                        format!("numRowGroupsPruned for {}", p.filenames.join(",")),
-                        p.metrics.row_groups_pruned.as_ref().clone(),
-                    ),
-                ]
-            })
-            .chain(std::iter::once((
-                "numPredicateCreationErrors".to_string(),
-                self.metrics.predicate_creation_errors.as_ref().clone(),
-            )))
-            .collect()
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
     }
 }
 
@@ -837,7 +824,7 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
 
 fn build_row_group_predicate(
     predicate_builder: &PruningPredicate,
-    metrics: ParquetPartitionMetrics,
+    metrics: ParquetFileMetrics,
     row_group_metadata: &[RowGroupMetaData],
 ) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
     let parquet_schema = predicate_builder.schema().as_ref();
@@ -865,9 +852,11 @@ fn build_row_group_predicate(
     }
 }
 
+#[allow(clippy::too_many_arguments)]
 fn read_files(
+    partition: usize,
     filenames: &[String],
-    metrics: ParquetPartitionMetrics,
+    metrics: ExecutionPlanMetricsSet,
     projection: &[usize],
     predicate_builder: &Option<PruningPredicate>,
     batch_size: usize,
@@ -876,12 +865,13 @@ fn read_files(
 ) -> Result<()> {
     let mut total_rows = 0;
     'outer: for filename in filenames {
+        let file_metrics = ParquetFileMetrics::new(partition, filename, &metrics);
         let file = File::open(&filename)?;
         let mut file_reader = SerializedFileReader::new(file)?;
         if let Some(predicate_builder) = predicate_builder {
             let row_group_predicate = build_row_group_predicate(
                 predicate_builder,
-                metrics.clone(),
+                file_metrics,
                 file_reader.metadata().row_groups(),
             );
             file_reader.filter_row_groups(&row_group_predicate);
@@ -1016,6 +1006,11 @@ mod tests {
         Ok(())
     }
 
+    fn parquet_file_metrics() -> ParquetFileMetrics {
+        let metrics = Arc::new(ExecutionPlanMetricsSet::new());
+        ParquetFileMetrics::new(0, "file.parquet", &metrics)
+    }
+
     #[test]
     fn row_group_predicate_builder_simple_expr() -> Result<()> {
         use crate::logical_plan::{col, lit};
@@ -1036,7 +1031,7 @@ mod tests {
         let row_group_metadata = vec![rgm1, rgm2];
         let row_group_predicate = build_row_group_predicate(
             &predicate_builder,
-            ParquetPartitionMetrics::new(),
+            parquet_file_metrics(),
             &row_group_metadata,
         );
         let row_group_filter = row_group_metadata
@@ -1069,7 +1064,7 @@ mod tests {
         let row_group_metadata = vec![rgm1, rgm2];
         let row_group_predicate = build_row_group_predicate(
             &predicate_builder,
-            ParquetPartitionMetrics::new(),
+            parquet_file_metrics(),
             &row_group_metadata,
         );
         let row_group_filter = row_group_metadata
@@ -1117,7 +1112,7 @@ mod tests {
         let row_group_metadata = vec![rgm1, rgm2];
         let row_group_predicate = build_row_group_predicate(
             &predicate_builder,
-            ParquetPartitionMetrics::new(),
+            parquet_file_metrics(),
             &row_group_metadata,
         );
         let row_group_filter = row_group_metadata
@@ -1135,7 +1130,7 @@ mod tests {
         let predicate_builder = PruningPredicate::try_new(&expr, schema)?;
         let row_group_predicate = build_row_group_predicate(
             &predicate_builder,
-            ParquetPartitionMetrics::new(),
+            parquet_file_metrics(),
             &row_group_metadata,
         );
         let row_group_filter = row_group_metadata
@@ -1182,7 +1177,7 @@ mod tests {
         let row_group_metadata = vec![rgm1, rgm2];
         let row_group_predicate = build_row_group_predicate(
             &predicate_builder,
-            ParquetPartitionMetrics::new(),
+            parquet_file_metrics(),
             &row_group_metadata,
         );
         let row_group_filter = row_group_metadata
diff --git a/datafusion/src/physical_plan/repartition.rs b/datafusion/src/physical_plan/repartition.rs
index eb3fe55..8ba9a4f 100644
--- a/datafusion/src/physical_plan/repartition.rs
+++ b/datafusion/src/physical_plan/repartition.rs
@@ -21,17 +21,17 @@
 use std::pin::Pin;
 use std::sync::Arc;
 use std::task::{Context, Poll};
-use std::time::Instant;
 use std::{any::Any, vec};
 
 use crate::error::{DataFusionError, Result};
 use crate::physical_plan::hash_utils::create_hashes;
-use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning, SQLMetric};
+use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
 use arrow::record_batch::RecordBatch;
 use arrow::{array::Array, error::Result as ArrowResult};
 use arrow::{compute::take, datatypes::SchemaRef};
 use tokio_stream::wrappers::UnboundedReceiverStream;
 
+use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
 use super::{RecordBatchStream, SendableRecordBatchStream};
 use async_trait::async_trait;
 
@@ -63,38 +63,48 @@ pub struct RepartitionExec {
     >,
 
     /// Execution metrics
-    metrics: RepartitionMetrics,
+    metrics: ExecutionPlanMetricsSet,
 }
 
 #[derive(Debug, Clone)]
 struct RepartitionMetrics {
     /// Time in nanos to execute child operator and fetch batches
-    fetch_nanos: Arc<SQLMetric>,
+    fetch_time: metrics::Time,
     /// Time in nanos to perform repartitioning
-    repart_nanos: Arc<SQLMetric>,
+    repart_time: metrics::Time,
     /// Time in nanos for sending resulting batches to channels
-    send_nanos: Arc<SQLMetric>,
+    send_time: metrics::Time,
 }
 
 impl RepartitionMetrics {
-    fn new() -> Self {
+    pub fn new(
+        output_partition: usize,
+        input_partition: usize,
+        metrics: &ExecutionPlanMetricsSet,
+    ) -> Self {
+        let label = metrics::Label::new("inputPartition", input_partition.to_string());
+
+        // Time in nanos to execute child operator and fetch batches
+        let fetch_time = MetricBuilder::new(metrics)
+            .with_label(label.clone())
+            .subset_time("fetch_time", output_partition);
+
+        // Time in nanos to perform repartitioning
+        let repart_time = MetricBuilder::new(metrics)
+            .with_label(label.clone())
+            .subset_time("repart_time", output_partition);
+
+        // Time in nanos for sending resulting batches to channels
+        let send_time = MetricBuilder::new(metrics)
+            .with_label(label)
+            .subset_time("send_time", output_partition);
+
         Self {
-            fetch_nanos: SQLMetric::time_nanos(),
-            repart_nanos: SQLMetric::time_nanos(),
-            send_nanos: SQLMetric::time_nanos(),
+            fetch_time,
+            repart_time,
+            send_time,
         }
     }
-    /// Convert into the external metrics form
-    fn to_hashmap(&self) -> HashMap<String, SQLMetric> {
-        let mut metrics = HashMap::new();
-        metrics.insert("fetchTime".to_owned(), self.fetch_nanos.as_ref().clone());
-        metrics.insert(
-            "repartitionTime".to_owned(),
-            self.repart_nanos.as_ref().clone(),
-        );
-        metrics.insert("sendTime".to_owned(), self.send_nanos.as_ref().clone());
-        metrics
-    }
 }
 
 impl RepartitionExec {
@@ -175,6 +185,8 @@ impl ExecutionPlan for RepartitionExec {
                     .map(|(partition, (tx, _rx))| (*partition, tx.clone()))
                     .collect();
 
+                let r_metrics = RepartitionMetrics::new(i, partition, &self.metrics);
+
                 let input_task: JoinHandle<Result<()>> =
                     tokio::spawn(Self::pull_from_input(
                         random.clone(),
@@ -182,7 +194,7 @@ impl ExecutionPlan for RepartitionExec {
                         i,
                         txs.clone(),
                         self.partitioning.clone(),
-                        self.metrics.clone(),
+                        r_metrics,
                     ));
 
                 // In a separate task, wait for each input to be done
@@ -201,8 +213,8 @@ impl ExecutionPlan for RepartitionExec {
         }))
     }
 
-    fn metrics(&self) -> HashMap<String, SQLMetric> {
-        self.metrics.to_hashmap()
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
     }
 
     fn fmt_as(
@@ -228,7 +240,7 @@ impl RepartitionExec {
             input,
             partitioning,
             channels: Arc::new(Mutex::new(HashMap::new())),
-            metrics: RepartitionMetrics::new(),
+            metrics: ExecutionPlanMetricsSet::new(),
         })
     }
 
@@ -244,14 +256,14 @@ impl RepartitionExec {
         i: usize,
         mut txs: HashMap<usize, UnboundedSender<Option<ArrowResult<RecordBatch>>>>,
         partitioning: Partitioning,
-        metrics: RepartitionMetrics,
+        r_metrics: RepartitionMetrics,
     ) -> Result<()> {
         let num_output_partitions = txs.len();
 
         // execute the child operator
-        let now = Instant::now();
+        let timer = r_metrics.fetch_time.timer();
         let mut stream = input.execute(i).await?;
-        metrics.fetch_nanos.add_elapsed(now);
+        timer.done();
 
         let mut counter = 0;
         let hashes_buf = &mut vec![];
@@ -260,9 +272,9 @@ impl RepartitionExec {
         // pulling inputs
         while !txs.is_empty() {
             // fetch the next batch
-            let now = Instant::now();
+            let timer = r_metrics.fetch_time.timer();
             let result = stream.next().await;
-            metrics.fetch_nanos.add_elapsed(now);
+            timer.done();
 
             // Input is done
             if result.is_none() {
@@ -272,7 +284,7 @@ impl RepartitionExec {
 
             match &partitioning {
                 Partitioning::RoundRobinBatch(_) => {
-                    let now = Instant::now();
+                    let timer = r_metrics.send_time.timer();
                     let output_partition = counter % num_output_partitions;
                     // if there is still a receiver, send to it
                     if let Some(tx) = txs.get_mut(&output_partition) {
@@ -281,10 +293,10 @@ impl RepartitionExec {
                             txs.remove(&output_partition);
                         }
                     }
-                    metrics.send_nanos.add_elapsed(now);
+                    timer.done();
                 }
                 Partitioning::Hash(exprs, _) => {
-                    let now = Instant::now();
+                    let timer = r_metrics.repart_time.timer();
                     let input_batch = result?;
                     let arrays = exprs
                         .iter()
@@ -303,11 +315,12 @@ impl RepartitionExec {
                         indices[(*hash % num_output_partitions as u64) as usize]
                             .push(index as u64)
                     }
-                    metrics.repart_nanos.add_elapsed(now);
+                    timer.done();
+
                     for (num_output_partition, partition_indices) in
                         indices.into_iter().enumerate()
                     {
-                        let now = Instant::now();
+                        let timer = r_metrics.repart_time.timer();
                         let indices = partition_indices.into();
                         // Produce batches based on indices
                         let columns = input_batch
@@ -321,8 +334,9 @@ impl RepartitionExec {
                             .collect::<Result<Vec<Arc<dyn Array>>>>()?;
                         let output_batch =
                             RecordBatch::try_new(input_batch.schema(), columns);
-                        metrics.repart_nanos.add_elapsed(now);
-                        let now = Instant::now();
+                        timer.done();
+
+                        let timer = r_metrics.repart_time.timer();
                         // if there is still a receiver, send to it
                         if let Some(tx) = txs.get_mut(&num_output_partition) {
                             if tx.send(Some(output_batch)).is_err() {
@@ -330,7 +344,7 @@ impl RepartitionExec {
                                 txs.remove(&num_output_partition);
                             }
                         }
-                        metrics.send_nanos.add_elapsed(now);
+                        timer.done();
                     }
                 }
                 other => {
diff --git a/datafusion/src/physical_plan/sort.rs b/datafusion/src/physical_plan/sort.rs
index faaa10d..f1346b5 100644
--- a/datafusion/src/physical_plan/sort.rs
+++ b/datafusion/src/physical_plan/sort.rs
@@ -17,11 +17,12 @@
 
 //! Defines the SORT plan
 
+use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
 use super::{RecordBatchStream, SendableRecordBatchStream};
 use crate::error::{DataFusionError, Result};
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::{
-    common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, SQLMetric,
+    common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
 };
 pub use arrow::compute::SortOptions;
 use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions};
@@ -32,13 +33,11 @@ use arrow::{array::ArrayRef, error::ArrowError};
 use async_trait::async_trait;
 use futures::stream::Stream;
 use futures::Future;
-use hashbrown::HashMap;
 use pin_project_lite::pin_project;
 use std::any::Any;
 use std::pin::Pin;
 use std::sync::Arc;
 use std::task::{Context, Poll};
-use std::time::Instant;
 
 /// Sort execution plan
 #[derive(Debug)]
@@ -47,10 +46,8 @@ pub struct SortExec {
     input: Arc<dyn ExecutionPlan>,
     /// Sort expressions
     expr: Vec<PhysicalSortExpr>,
-    /// Output rows
-    output_rows: Arc<SQLMetric>,
-    /// Time to sort batches
-    sort_time_nanos: Arc<SQLMetric>,
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
     /// Preserve partitions of input plan
     preserve_partitioning: bool,
 }
@@ -74,9 +71,8 @@ impl SortExec {
         Self {
             expr,
             input,
+            metrics: ExecutionPlanMetricsSet::new(),
             preserve_partitioning,
-            output_rows: SQLMetric::counter(),
-            sort_time_nanos: SQLMetric::time_nanos(),
         }
     }
 
@@ -157,11 +153,15 @@ impl ExecutionPlan for SortExec {
 
         let input = self.input.execute(partition).await?;
 
+        let output_rows = MetricBuilder::new(&self.metrics).output_rows(partition);
+
+        let cpu_time = MetricBuilder::new(&self.metrics).cpu_time(partition);
+
         Ok(Box::pin(SortStream::new(
             input,
             self.expr.clone(),
-            self.output_rows.clone(),
-            self.sort_time_nanos.clone(),
+            output_rows,
+            cpu_time,
         )))
     }
 
@@ -178,11 +178,8 @@ impl ExecutionPlan for SortExec {
         }
     }
 
-    fn metrics(&self) -> HashMap<String, SQLMetric> {
-        let mut metrics = HashMap::new();
-        metrics.insert("outputRows".to_owned(), (*self.output_rows).clone());
-        metrics.insert("sortTime".to_owned(), (*self.sort_time_nanos).clone());
-        metrics
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
     }
 }
 
@@ -229,7 +226,7 @@ pin_project! {
         output: futures::channel::oneshot::Receiver<ArrowResult<Option<RecordBatch>>>,
         finished: bool,
         schema: SchemaRef,
-        output_rows: Arc<SQLMetric>,
+        output_rows: metrics::Count
     }
 }
 
@@ -237,8 +234,8 @@ impl SortStream {
     fn new(
         input: SendableRecordBatchStream,
         expr: Vec<PhysicalSortExpr>,
-        output_rows: Arc<SQLMetric>,
-        sort_time: Arc<SQLMetric>,
+        output_rows: metrics::Count,
+        sort_time: metrics::Time,
     ) -> Self {
         let (tx, rx) = futures::channel::oneshot::channel();
         let schema = input.schema();
@@ -248,14 +245,14 @@ impl SortStream {
                 .await
                 .map_err(DataFusionError::into_arrow_external_error)
                 .and_then(move |batches| {
-                    let now = Instant::now();
+                    let timer = sort_time.timer();
                     // combine all record batches into one for each column
                     let combined = common::combine_batches(&batches, schema.clone())?;
                     // sort combined record batch
                     let result = combined
                         .map(|batch| sort_batch(batch, schema, &expr))
                         .transpose()?;
-                    sort_time.add(now.elapsed().as_nanos() as usize);
+                    timer.done();
                     Ok(result)
                 });
 
@@ -438,8 +435,9 @@ mod tests {
         assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type());
 
         let result: Vec<RecordBatch> = collect(sort_exec.clone()).await?;
-        assert!(sort_exec.metrics().get("sortTime").unwrap().value() > 0);
-        assert_eq!(sort_exec.metrics().get("outputRows").unwrap().value(), 8);
+        let metrics = sort_exec.metrics().unwrap();
+        assert!(metrics.cpu_time().unwrap() > 0);
+        assert_eq!(metrics.output_rows().unwrap(), 8);
         assert_eq!(result.len(), 1);
 
         let columns = result[0].columns();
diff --git a/datafusion/tests/parquet_pruning.rs b/datafusion/tests/parquet_pruning.rs
index 789f081..14f5dd2 100644
--- a/datafusion/tests/parquet_pruning.rs
+++ b/datafusion/tests/parquet_pruning.rs
@@ -33,11 +33,13 @@ use chrono::{Datelike, Duration};
 use datafusion::{
     datasource::TableProvider,
     logical_plan::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder},
-    physical_plan::{plan_metrics, SQLMetric},
+    physical_plan::{
+        accept, metrics::MetricsSet, parquet::ParquetExec, ExecutionPlan,
+        ExecutionPlanVisitor,
+    },
     prelude::{ExecutionConfig, ExecutionContext},
     scalar::ScalarValue,
 };
-use hashbrown::HashMap;
 use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
 use tempfile::NamedTempFile;
 
@@ -417,6 +419,9 @@ enum Scenario {
 /// table "t" registered, pointing at a parquet file made with
 /// `make_test_file`
 struct ContextWithParquet {
+    #[allow(dead_code)]
+    /// temp file parquet data is written to. The file is cleaned up
+    /// when dropped
     file: NamedTempFile,
     provider: Arc<dyn TableProvider>,
     ctx: ExecutionContext,
@@ -426,8 +431,8 @@ struct ContextWithParquet {
 struct TestOutput {
     /// The input string
     sql: String,
-    /// Normalized metrics (filename replaced by a constant)
-    metrics: HashMap<String, SQLMetric>,
+    /// Execution metrics for the Parquet Scan
+    parquet_metrics: MetricsSet,
     /// number of rows in results
     result_rows: usize,
     /// the contents of the input, as a string
@@ -439,32 +444,25 @@ struct TestOutput {
 impl TestOutput {
     /// retrieve the value of the named metric, if any
     fn metric_value(&self, metric_name: &str) -> Option<usize> {
-        self.metrics.get(metric_name).map(|m| m.value())
+        self.parquet_metrics
+            .sum(|metric| metric.value().name() == metric_name)
+            .map(|v| v.as_usize())
     }
 
     /// The number of times the pruning predicate evaluation errors
     fn predicate_evaluation_errors(&self) -> Option<usize> {
-        self.metric_value("numPredicateEvaluationErrors for PARQUET_FILE")
+        self.metric_value("predicate_evaluation_errors")
     }
 
     /// The number of times the pruning predicate evaluation errors
     fn row_groups_pruned(&self) -> Option<usize> {
-        self.metric_value("numRowGroupsPruned for PARQUET_FILE")
+        self.metric_value("row_groups_pruned")
     }
 
     fn description(&self) -> String {
-        let metrics = self
-            .metrics
-            .iter()
-            .map(|(name, val)| format!("  {} = {:?}", name, val))
-            .collect::<Vec<_>>();
-
         format!(
             "Input:\n{}\nQuery:\n{}\nOutput:\n{}\nMetrics:\n{}",
-            self.pretty_input,
-            self.sql,
-            self.pretty_results,
-            metrics.join("\n")
+            self.pretty_input, self.sql, self.pretty_results, self.parquet_metrics,
         )
     }
 }
@@ -532,25 +530,35 @@ impl ContextWithParquet {
         let pretty_input = pretty_format_batches(&input).unwrap();
 
         let logical_plan = self.ctx.optimize(&logical_plan).expect("optimizing plan");
-        let execution_plan = self
+        let physical_plan = self
             .ctx
             .create_physical_plan(&logical_plan)
             .expect("creating physical plan");
 
-        let results = datafusion::physical_plan::collect(execution_plan.clone())
+        let results = datafusion::physical_plan::collect(physical_plan.clone())
             .await
             .expect("Running");
 
-        // replace the path name, which varies test to test,a with some
-        // constant for test comparisons
-        let path = self.file.path();
-        let path_name = path.to_string_lossy();
-        let metrics = plan_metrics(execution_plan)
-            .into_iter()
-            .map(|(name, metric)| {
-                (name.replace(path_name.as_ref(), "PARQUET_FILE"), metric)
-            })
-            .collect();
+        // find the parquet metrics
+        struct MetricsFinder {
+            metrics: Option<MetricsSet>,
+        }
+        impl ExecutionPlanVisitor for MetricsFinder {
+            type Error = std::convert::Infallible;
+            fn pre_visit(
+                &mut self,
+                plan: &dyn ExecutionPlan,
+            ) -> Result<bool, Self::Error> {
+                if plan.as_any().downcast_ref::<ParquetExec>().is_some() {
+                    self.metrics = plan.metrics();
+                }
+                // stop searching once we have found the metrics
+                Ok(self.metrics.is_none())
+            }
+        }
+        let mut finder = MetricsFinder { metrics: None };
+        accept(physical_plan.as_ref(), &mut finder).unwrap();
+        let parquet_metrics = finder.metrics.unwrap();
 
         let result_rows = results.iter().map(|b| b.num_rows()).sum();
 
@@ -559,7 +567,7 @@ impl ContextWithParquet {
         let sql = sql.into();
         TestOutput {
             sql,
-            metrics,
+            parquet_metrics,
             result_rows,
             pretty_input,
             pretty_results,
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 285a3a9..33788a9 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -2172,6 +2172,8 @@ async fn csv_explain_analyze() {
     let formatted = arrow::util::pretty::pretty_format_batches(&actual).unwrap();
     let formatted = normalize_for_explain(&formatted);
 
+    println!("ANALYZE EXPLAIN:\n{}", formatted);
+
     // Only test basic plumbing and try to avoid having to change too
     // many things
     let needle = "RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), metrics=[";
@@ -2181,7 +2183,7 @@ async fn csv_explain_analyze() {
         needle,
         formatted
     );
-    let verbose_needle = "Output Rows       | 5";
+    let verbose_needle = "Output Rows";
     assert!(
         !formatted.contains(verbose_needle),
         "found unexpected '{}' in\n{}",
@@ -2201,7 +2203,7 @@ async fn csv_explain_analyze_verbose() {
     let formatted = arrow::util::pretty::pretty_format_batches(&actual).unwrap();
     let formatted = normalize_for_explain(&formatted);
 
-    let verbose_needle = "Output Rows       | 5";
+    let verbose_needle = "Output Rows";
     assert!(
         formatted.contains(verbose_needle),
         "did not find '{}' in\n{}",