You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/08/24 15:05:59 UTC
[arrow-datafusion] branch master updated: Improve SQLMetric APIs,
port existing metrics (#908)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new bd49b86 Improve SQLMetric APIs, port existing metrics (#908)
bd49b86 is described below
commit bd49b86c48b62d3b215fa30e6fabb468928fd895
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Tue Aug 24 11:05:51 2021 -0400
Improve SQLMetric APIs, port existing metrics (#908)
---
.../core/src/execution_plans/shuffle_reader.rs | 24 +-
.../core/src/execution_plans/shuffle_writer.rs | 60 +--
ballista/rust/core/src/utils.rs | 12 +-
datafusion/src/physical_optimizer/repartition.rs | 13 +-
datafusion/src/physical_plan/analyze.rs | 8 +
datafusion/src/physical_plan/display.rs | 69 ++-
datafusion/src/physical_plan/hash_aggregate.rs | 32 +-
datafusion/src/physical_plan/hash_join.rs | 100 ++--
datafusion/src/physical_plan/metrics/builder.rs | 150 ++++++
datafusion/src/physical_plan/metrics/mod.rs | 528 +++++++++++++++++++++
datafusion/src/physical_plan/metrics/value.rs | 261 ++++++++++
datafusion/src/physical_plan/mod.rs | 114 +----
datafusion/src/physical_plan/parquet.rs | 131 +++--
datafusion/src/physical_plan/repartition.rs | 90 ++--
datafusion/src/physical_plan/sort.rs | 44 +-
datafusion/tests/parquet_pruning.rs | 68 +--
datafusion/tests/sql.rs | 6 +-
17 files changed, 1320 insertions(+), 390 deletions(-)
diff --git a/ballista/rust/core/src/execution_plans/shuffle_reader.rs b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
index 0447ca9..bc5dbc1 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
@@ -28,9 +28,10 @@ use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::Result as ArrowResult;
use datafusion::arrow::record_batch::RecordBatch;
-use datafusion::physical_plan::{
- DisplayFormatType, ExecutionPlan, Partitioning, SQLMetric,
+use datafusion::physical_plan::metrics::{
+ ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
};
+use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan, Metric, Partitioning};
use datafusion::{
error::{DataFusionError, Result},
physical_plan::RecordBatchStream,
@@ -47,8 +48,8 @@ pub struct ShuffleReaderExec {
/// Each partition of a shuffle can read data from multiple locations
pub(crate) partition: Vec<Vec<PartitionLocation>>,
pub(crate) schema: SchemaRef,
- /// Time to fetch data from executor
- fetch_time: Arc<SQLMetric>,
+ /// Execution metrics
+ metrics: ExecutionPlanMetricsSet,
}
impl ShuffleReaderExec {
@@ -60,7 +61,7 @@ impl ShuffleReaderExec {
Ok(Self {
partition,
schema,
- fetch_time: SQLMetric::time_nanos(),
+ metrics: ExecutionPlanMetricsSet::new(),
})
}
}
@@ -100,13 +101,16 @@ impl ExecutionPlan for ShuffleReaderExec {
) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
info!("ShuffleReaderExec::execute({})", partition);
- let start = Instant::now();
+ let fetch_time =
+ MetricBuilder::new(&self.metrics).subset_time("fetch_time", partition);
+ let timer = fetch_time.timer();
+
let partition_locations = &self.partition[partition];
let result = future::join_all(partition_locations.iter().map(fetch_partition))
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
- self.fetch_time.add_elapsed(start);
+ timer.done();
let result = WrappedStream::new(
Box::pin(futures::stream::iter(result).flatten()),
@@ -149,10 +153,8 @@ impl ExecutionPlan for ShuffleReaderExec {
}
}
- fn metrics(&self) -> HashMap<String, SQLMetric> {
- let mut metrics = HashMap::new();
- metrics.insert("fetchTime".to_owned(), (*self.fetch_time).clone());
- metrics
+ fn metrics(&self) -> Option<MetricsSet> {
+ Some(self.metrics.clone_inner())
}
}
diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
index b1db21f..36e445b 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
@@ -45,10 +45,13 @@ use datafusion::arrow::ipc::writer::FileWriter;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result};
use datafusion::physical_plan::hash_utils::create_hashes;
+use datafusion::physical_plan::metrics::{
+ self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
+};
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::Partitioning::RoundRobinBatch;
use datafusion::physical_plan::{
- DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SQLMetric,
+ DisplayFormatType, ExecutionPlan, Metric, Partitioning, RecordBatchStream,
};
use futures::StreamExt;
use hashbrown::HashMap;
@@ -71,24 +74,30 @@ pub struct ShuffleWriterExec {
work_dir: String,
/// Optional shuffle output partitioning
shuffle_output_partitioning: Option<Partitioning>,
- /// Shuffle write metrics
- metrics: ShuffleWriteMetrics,
+ /// Execution metrics
+ metrics: ExecutionPlanMetricsSet,
}
#[derive(Debug, Clone)]
struct ShuffleWriteMetrics {
/// Time spend writing batches to shuffle files
- write_time: Arc<SQLMetric>,
- input_rows: Arc<SQLMetric>,
- output_rows: Arc<SQLMetric>,
+ write_time: metrics::Time,
+ input_rows: metrics::Count,
+ output_rows: metrics::Count,
}
impl ShuffleWriteMetrics {
- fn new() -> Self {
+ fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
+ let write_time = MetricBuilder::new(metrics).subset_time("write_time", partition);
+
+ let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
+
+ let output_rows = MetricBuilder::new(metrics).output_rows(partition);
+
Self {
- write_time: SQLMetric::time_nanos(),
- input_rows: SQLMetric::counter(),
- output_rows: SQLMetric::counter(),
+ write_time,
+ input_rows,
+ output_rows,
}
}
}
@@ -108,7 +117,7 @@ impl ShuffleWriterExec {
plan,
work_dir,
shuffle_output_partitioning,
- metrics: ShuffleWriteMetrics::new(),
+ metrics: ExecutionPlanMetricsSet::new(),
})
}
@@ -139,9 +148,11 @@ impl ShuffleWriterExec {
path.push(&self.job_id);
path.push(&format!("{}", self.stage_id));
+ let write_metrics = ShuffleWriteMetrics::new(input_partition, &self.metrics);
+
match &self.shuffle_output_partitioning {
None => {
- let start = Instant::now();
+ let timer = write_metrics.write_time.timer();
path.push(&format!("{}", input_partition));
std::fs::create_dir_all(&path)?;
path.push("data.arrow");
@@ -152,18 +163,18 @@ impl ShuffleWriterExec {
let stats = utils::write_stream_to_disk(
&mut stream,
path,
- self.metrics.write_time.clone(),
+ &write_metrics.write_time,
)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
- self.metrics
+ write_metrics
.input_rows
.add(stats.num_rows.unwrap_or(0) as usize);
- self.metrics
+ write_metrics
.output_rows
.add(stats.num_rows.unwrap_or(0) as usize);
- self.metrics.write_time.add_elapsed(start);
+ timer.done();
info!(
"Executed partition {} in {} seconds. Statistics: {}",
@@ -197,7 +208,7 @@ impl ShuffleWriterExec {
while let Some(result) = stream.next().await {
let input_batch = result?;
- self.metrics.input_rows.add(input_batch.num_rows());
+ write_metrics.input_rows.add(input_batch.num_rows());
let arrays = exprs
.iter()
@@ -239,7 +250,7 @@ impl ShuffleWriterExec {
//TODO optimize so we don't write or fetch empty partitions
//if output_batch.num_rows() > 0 {
- let start = Instant::now();
+ let timer = write_metrics.write_time.timer();
match &mut writers[output_partition] {
Some(w) => {
w.write(&output_batch)?;
@@ -260,9 +271,8 @@ impl ShuffleWriterExec {
writers[output_partition] = Some(writer);
}
}
- self.metrics.output_rows.add(output_batch.num_rows());
- self.metrics.write_time.add_elapsed(start);
- //}
+ write_metrics.output_rows.add(output_batch.num_rows());
+ timer.done();
}
}
@@ -388,12 +398,8 @@ impl ExecutionPlan for ShuffleWriterExec {
Ok(Box::pin(MemoryStream::try_new(vec![batch], schema, None)?))
}
- fn metrics(&self) -> HashMap<String, SQLMetric> {
- let mut metrics = HashMap::new();
- metrics.insert("inputRows".to_owned(), (*self.metrics.input_rows).clone());
- metrics.insert("outputRows".to_owned(), (*self.metrics.output_rows).clone());
- metrics.insert("writeTime".to_owned(), (*self.metrics.write_time).clone());
- metrics
+ fn metrics(&self) -> Option<MetricsSet> {
+ Some(self.metrics.clone_inner())
}
fn fmt_as(
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index d753f70..bf0d152 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -61,7 +61,7 @@ use datafusion::physical_plan::parquet::ParquetExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::sort::SortExec;
use datafusion::physical_plan::{
- AggregateExpr, ExecutionPlan, PhysicalExpr, RecordBatchStream, SQLMetric,
+ metrics, AggregateExpr, ExecutionPlan, Metric, PhysicalExpr, RecordBatchStream,
};
use futures::{future, Stream, StreamExt};
use std::time::Instant;
@@ -71,7 +71,7 @@ use std::time::Instant;
pub async fn write_stream_to_disk(
stream: &mut Pin<Box<dyn RecordBatchStream + Send + Sync>>,
path: &str,
- disk_write_metric: Arc<SQLMetric>,
+ disk_write_metric: &metrics::Time,
) -> Result<PartitionStats> {
let file = File::create(&path).map_err(|e| {
BallistaError::General(format!(
@@ -97,13 +97,13 @@ pub async fn write_stream_to_disk(
num_rows += batch.num_rows();
num_bytes += batch_size_bytes;
- let start = Instant::now();
+ let timer = disk_write_metric.timer();
writer.write(&batch)?;
- disk_write_metric.add_elapsed(start);
+ timer.done();
}
- let start = Instant::now();
+ let timer = disk_write_metric.timer();
writer.finish()?;
- disk_write_metric.add_elapsed(start);
+ timer.done();
Ok(PartitionStats::new(
Some(num_rows as u64),
Some(num_batches),
diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs
index 4504c81..984b234 100644
--- a/datafusion/src/physical_optimizer/repartition.rs
+++ b/datafusion/src/physical_optimizer/repartition.rs
@@ -110,24 +110,25 @@ mod tests {
use super::*;
use crate::datasource::datasource::Statistics;
- use crate::physical_plan::parquet::{
- ParquetExec, ParquetExecMetrics, ParquetPartition,
- };
+ use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
+ use crate::physical_plan::parquet::{ParquetExec, ParquetPartition};
use crate::physical_plan::projection::ProjectionExec;
#[test]
fn added_repartition_to_single_partition() -> Result<()> {
let schema = Arc::new(Schema::empty());
+ let metrics = ExecutionPlanMetricsSet::new();
let parquet_project = ProjectionExec::try_new(
vec![],
Arc::new(ParquetExec::new(
vec![ParquetPartition::new(
vec!["x".to_string()],
Statistics::default(),
+ metrics.clone(),
)],
schema,
None,
- ParquetExecMetrics::new(),
+ metrics,
None,
2048,
None,
@@ -154,6 +155,7 @@ mod tests {
#[test]
fn repartition_deepest_node() -> Result<()> {
let schema = Arc::new(Schema::empty());
+ let metrics = ExecutionPlanMetricsSet::new();
let parquet_project = ProjectionExec::try_new(
vec![],
Arc::new(ProjectionExec::try_new(
@@ -162,10 +164,11 @@ mod tests {
vec![ParquetPartition::new(
vec!["x".to_string()],
Statistics::default(),
+ metrics.clone(),
)],
schema,
None,
- ParquetExecMetrics::new(),
+ metrics,
None,
2048,
None,
diff --git a/datafusion/src/physical_plan/analyze.rs b/datafusion/src/physical_plan/analyze.rs
index 36726ad..d012557 100644
--- a/datafusion/src/physical_plan/analyze.rs
+++ b/datafusion/src/physical_plan/analyze.rs
@@ -164,6 +164,14 @@ impl ExecutionPlan for AnalyzeExec {
// Verbose output
// TODO make this more sophisticated
if verbose {
+ type_builder.append_value("Plan with Full Metrics").unwrap();
+
+ let annotated_plan =
+ DisplayableExecutionPlan::with_full_metrics(captured_input.as_ref())
+ .indent()
+ .to_string();
+ plan_builder.append_value(annotated_plan).unwrap();
+
type_builder.append_value("Output Rows").unwrap();
plan_builder.append_value(total_rows.to_string()).unwrap();
diff --git a/datafusion/src/physical_plan/display.rs b/datafusion/src/physical_plan/display.rs
index e251e4e..5ff99e5 100644
--- a/datafusion/src/physical_plan/display.rs
+++ b/datafusion/src/physical_plan/display.rs
@@ -35,8 +35,8 @@ pub enum DisplayFormatType {
/// Wraps an `ExecutionPlan` with various ways to display this plan
pub struct DisplayableExecutionPlan<'a> {
inner: &'a dyn ExecutionPlan,
- /// whether to show metrics or not
- with_metrics: bool,
+ /// How to show metrics
+ show_metrics: ShowMetrics,
}
impl<'a> DisplayableExecutionPlan<'a> {
@@ -45,16 +45,27 @@ impl<'a> DisplayableExecutionPlan<'a> {
pub fn new(inner: &'a dyn ExecutionPlan) -> Self {
Self {
inner,
- with_metrics: false,
+ show_metrics: ShowMetrics::None,
}
}
/// Create a wrapper around an [`'ExecutionPlan'] which can be
- /// pretty printed in a variety of ways
+ /// pretty printed in a variety of ways that also shows aggregated
+ /// metrics
pub fn with_metrics(inner: &'a dyn ExecutionPlan) -> Self {
Self {
inner,
- with_metrics: true,
+ show_metrics: ShowMetrics::Aggregated,
+ }
+ }
+
+ /// Create a wrapper around an [`'ExecutionPlan'] which can be
+ /// pretty printed in a variety of ways that also shows all low
+ /// level metrics
+ pub fn with_full_metrics(inner: &'a dyn ExecutionPlan) -> Self {
+ Self {
+ inner,
+ show_metrics: ShowMetrics::Full,
}
}
@@ -71,7 +82,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
pub fn indent(&self) -> impl fmt::Display + 'a {
struct Wrapper<'a> {
plan: &'a dyn ExecutionPlan,
- with_metrics: bool,
+ show_metrics: ShowMetrics,
}
impl<'a> fmt::Display for Wrapper<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
@@ -80,18 +91,30 @@ impl<'a> DisplayableExecutionPlan<'a> {
t,
f,
indent: 0,
- with_metrics: self.with_metrics,
+ show_metrics: self.show_metrics,
};
accept(self.plan, &mut visitor)
}
}
Wrapper {
plan: self.inner,
- with_metrics: self.with_metrics,
+ show_metrics: self.show_metrics,
}
}
}
+#[derive(Debug, Clone, Copy)]
+enum ShowMetrics {
+ /// Do not show any metrics
+ None,
+
+ /// Show aggregrated metrics across partition
+ Aggregated,
+
+ /// Show full per-partition metrics
+ Full,
+}
+
/// Formats plans with a single line per node.
struct IndentVisitor<'a, 'b> {
/// How to format each node
@@ -100,8 +123,8 @@ struct IndentVisitor<'a, 'b> {
f: &'a mut fmt::Formatter<'b>,
/// Indent size
indent: usize,
- /// whether to show metrics or not
- with_metrics: bool,
+ /// How to show metrics
+ show_metrics: ShowMetrics,
}
impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> {
@@ -112,16 +135,22 @@ impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> {
) -> std::result::Result<bool, Self::Error> {
write!(self.f, "{:indent$}", "", indent = self.indent * 2)?;
plan.fmt_as(self.t, self.f)?;
- if self.with_metrics {
- write!(
- self.f,
- ", metrics=[{}]",
- plan.metrics()
- .iter()
- .map(|(k, v)| format!("{}={:?}", k, v.value))
- .collect::<Vec<_>>()
- .join(", ")
- )?;
+ match self.show_metrics {
+ ShowMetrics::None => {}
+ ShowMetrics::Aggregated => {
+ if let Some(metrics) = plan.metrics() {
+ write!(self.f, ", metrics=[{}]", metrics.aggregate_by_partition())?;
+ } else {
+ write!(self.f, ", metrics=[]")?;
+ }
+ }
+ ShowMetrics::Full => {
+ if let Some(metrics) = plan.metrics() {
+ write!(self.f, ", metrics=[{}]", metrics)?;
+ } else {
+ write!(self.f, ", metrics=[]")?;
+ }
+ }
}
writeln!(self.f)?;
self.indent += 1;
diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs
index 1c07f61..5a2ec9e 100644
--- a/datafusion/src/physical_plan/hash_aggregate.rs
+++ b/datafusion/src/physical_plan/hash_aggregate.rs
@@ -27,13 +27,12 @@ use futures::{
stream::{Stream, StreamExt},
Future,
};
-use hashbrown::HashMap;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::hash_utils::create_hashes;
use crate::physical_plan::{
Accumulator, AggregateExpr, DisplayFormatType, Distribution, ExecutionPlan,
- Partitioning, PhysicalExpr, SQLMetric,
+ Partitioning, PhysicalExpr,
};
use crate::scalar::ScalarValue;
@@ -51,6 +50,7 @@ use pin_project_lite::pin_project;
use async_trait::async_trait;
+use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
use super::{expressions::Column, RecordBatchStream, SendableRecordBatchStream};
/// Hash aggregate modes
@@ -86,8 +86,8 @@ pub struct HashAggregateExec {
/// same as input.schema() but for the final aggregate it will be the same as the input
/// to the partial aggregate
input_schema: SchemaRef,
- /// Metric to track number of output rows
- output_rows: Arc<SQLMetric>,
+ /// Execution Metrics
+ metrics: ExecutionPlanMetricsSet,
}
fn create_schema(
@@ -136,8 +136,6 @@ impl HashAggregateExec {
let schema = Arc::new(schema);
- let output_rows = SQLMetric::counter();
-
Ok(HashAggregateExec {
mode,
group_expr,
@@ -145,7 +143,7 @@ impl HashAggregateExec {
input,
schema,
input_schema,
- output_rows,
+ metrics: ExecutionPlanMetricsSet::new(),
})
}
@@ -209,6 +207,8 @@ impl ExecutionPlan for HashAggregateExec {
let input = self.input.execute(partition).await?;
let group_expr = self.group_expr.iter().map(|x| x.0.clone()).collect();
+ let output_rows = MetricBuilder::new(&self.metrics).output_rows(partition);
+
if self.group_expr.is_empty() {
Ok(Box::pin(HashAggregateStream::new(
self.mode,
@@ -223,7 +223,7 @@ impl ExecutionPlan for HashAggregateExec {
group_expr,
self.aggr_expr.clone(),
input,
- self.output_rows.clone(),
+ output_rows,
)))
}
}
@@ -246,10 +246,8 @@ impl ExecutionPlan for HashAggregateExec {
}
}
- fn metrics(&self) -> HashMap<String, SQLMetric> {
- let mut metrics = HashMap::new();
- metrics.insert("outputRows".to_owned(), (*self.output_rows).clone());
- metrics
+ fn metrics(&self) -> Option<MetricsSet> {
+ Some(self.metrics.clone_inner())
}
fn fmt_as(
@@ -317,7 +315,7 @@ pin_project! {
#[pin]
output: futures::channel::oneshot::Receiver<ArrowResult<RecordBatch>>,
finished: bool,
- output_rows: Arc<SQLMetric>,
+ output_rows: metrics::Count,
}
}
@@ -526,7 +524,7 @@ impl GroupedHashAggregateStream {
group_expr: Vec<Arc<dyn PhysicalExpr>>,
aggr_expr: Vec<Arc<dyn AggregateExpr>>,
input: SendableRecordBatchStream,
- output_rows: Arc<SQLMetric>,
+ output_rows: metrics::Count,
) -> Self {
let (tx, rx) = futures::channel::oneshot::channel();
@@ -1072,9 +1070,9 @@ mod tests {
assert_batches_sorted_eq!(&expected, &result);
- let metrics = merged_aggregate.metrics();
- let output_rows = metrics.get("outputRows").unwrap();
- assert_eq!(3, output_rows.value());
+ let metrics = merged_aggregate.metrics().unwrap();
+ let output_rows = metrics.output_rows().unwrap();
+ assert_eq!(3, output_rows);
Ok(())
}
diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs
index 9970824..3a0a3f2 100644
--- a/datafusion/src/physical_plan/hash_join.rs
+++ b/datafusion/src/physical_plan/hash_join.rs
@@ -35,7 +35,6 @@ use std::{time::Instant, vec};
use async_trait::async_trait;
use futures::{Stream, StreamExt, TryStreamExt};
-use hashbrown::HashMap;
use tokio::sync::Mutex;
use arrow::array::Array;
@@ -51,12 +50,15 @@ use arrow::array::{
use hashbrown::raw::RawTable;
-use super::expressions::Column;
use super::hash_utils::create_hashes;
use super::{
coalesce_partitions::CoalescePartitionsExec,
hash_utils::{build_join_schema, check_join_is_valid, JoinOn},
};
+use super::{
+ expressions::Column,
+ metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
+};
use crate::error::{DataFusionError, Result};
use crate::logical_plan::JoinType;
@@ -65,7 +67,7 @@ use super::{
SendableRecordBatchStream,
};
use crate::physical_plan::coalesce_batches::concat_batches;
-use crate::physical_plan::{PhysicalExpr, SQLMetric};
+use crate::physical_plan::PhysicalExpr;
use log::debug;
use std::fmt;
@@ -111,33 +113,45 @@ pub struct HashJoinExec {
random_state: RandomState,
/// Partitioning mode to use
mode: PartitionMode,
- /// Metrics
- metrics: Arc<HashJoinMetrics>,
+ /// Execution metrics
+ metrics: ExecutionPlanMetricsSet,
}
/// Metrics for HashJoinExec
#[derive(Debug)]
struct HashJoinMetrics {
/// Total time for joining probe-side batches to the build-side batches
- join_time: Arc<SQLMetric>,
+ join_time: metrics::Time,
/// Number of batches consumed by this operator
- input_batches: Arc<SQLMetric>,
+ input_batches: metrics::Count,
/// Number of rows consumed by this operator
- input_rows: Arc<SQLMetric>,
+ input_rows: metrics::Count,
/// Number of batches produced by this operator
- output_batches: Arc<SQLMetric>,
+ output_batches: metrics::Count,
/// Number of rows produced by this operator
- output_rows: Arc<SQLMetric>,
+ output_rows: metrics::Count,
}
impl HashJoinMetrics {
- fn new() -> Self {
+ pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
+ let join_time = MetricBuilder::new(metrics).subset_time("join_time", partition);
+
+ let input_batches =
+ MetricBuilder::new(metrics).counter("input_batches", partition);
+
+ let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
+
+ let output_batches =
+ MetricBuilder::new(metrics).counter("output_rows", partition);
+
+ let output_rows = MetricBuilder::new(metrics).output_rows(partition);
+
Self {
- join_time: SQLMetric::time_nanos(),
- input_batches: SQLMetric::counter(),
- input_rows: SQLMetric::counter(),
- output_batches: SQLMetric::counter(),
- output_rows: SQLMetric::counter(),
+ join_time,
+ input_batches,
+ input_rows,
+ output_batches,
+ output_rows,
}
}
}
@@ -187,7 +201,7 @@ impl HashJoinExec {
build_side: Arc::new(Mutex::new(None)),
random_state,
mode: partition_mode,
- metrics: Arc::new(HashJoinMetrics::new()),
+ metrics: ExecutionPlanMetricsSet::new(),
})
}
@@ -425,7 +439,7 @@ impl ExecutionPlan for HashJoinExec {
column_indices,
self.random_state.clone(),
visited_left_side,
- self.metrics.clone(),
+ HashJoinMetrics::new(partition, &self.metrics),
)))
}
@@ -445,20 +459,8 @@ impl ExecutionPlan for HashJoinExec {
}
}
- fn metrics(&self) -> HashMap<String, SQLMetric> {
- let mut metrics = HashMap::new();
- metrics.insert("joinTime".to_owned(), (*self.metrics.join_time).clone());
- metrics.insert(
- "inputBatches".to_owned(),
- (*self.metrics.input_batches).clone(),
- );
- metrics.insert("inputRows".to_owned(), (*self.metrics.input_rows).clone());
- metrics.insert(
- "outputBatches".to_owned(),
- (*self.metrics.output_batches).clone(),
- );
- metrics.insert("outputRows".to_owned(), (*self.metrics.output_rows).clone());
- metrics
+ fn metrics(&self) -> Option<MetricsSet> {
+ Some(self.metrics.clone_inner())
}
}
@@ -522,7 +524,7 @@ struct HashJoinStream {
/// There is nothing to process anymore and left side is processed in case of left join
is_exhausted: bool,
/// Metrics
- metrics: Arc<HashJoinMetrics>,
+ join_metrics: HashJoinMetrics,
}
#[allow(clippy::too_many_arguments)]
@@ -537,7 +539,7 @@ impl HashJoinStream {
column_indices: Vec<ColumnIndex>,
random_state: RandomState,
visited_left_side: Vec<bool>,
- metrics: Arc<HashJoinMetrics>,
+ join_metrics: HashJoinMetrics,
) -> Self {
HashJoinStream {
schema,
@@ -550,7 +552,7 @@ impl HashJoinStream {
random_state,
visited_left_side,
is_exhausted: false,
- metrics,
+ join_metrics,
}
}
}
@@ -876,7 +878,7 @@ impl Stream for HashJoinStream {
.poll_next_unpin(cx)
.map(|maybe_batch| match maybe_batch {
Some(Ok(batch)) => {
- let start = Instant::now();
+ let timer = self.join_metrics.join_time.timer();
let result = build_batch(
&batch,
&self.left_data,
@@ -887,14 +889,12 @@ impl Stream for HashJoinStream {
&self.column_indices,
&self.random_state,
);
- self.metrics.input_batches.add(1);
- self.metrics.input_rows.add(batch.num_rows());
+ self.join_metrics.input_batches.add(1);
+ self.join_metrics.input_rows.add(batch.num_rows());
if let Ok((ref batch, ref left_side)) = result {
- self.metrics
- .join_time
- .add(start.elapsed().as_millis() as usize);
- self.metrics.output_batches.add(1);
- self.metrics.output_rows.add(batch.num_rows());
+ timer.done();
+ self.join_metrics.output_batches.add(1);
+ self.join_metrics.output_rows.add(batch.num_rows());
match self.join_type {
JoinType::Left
@@ -911,7 +911,7 @@ impl Stream for HashJoinStream {
Some(result.map(|x| x.0))
}
other => {
- let start = Instant::now();
+ let timer = self.join_metrics.join_time.timer();
// For the left join, produce rows for unmatched rows
match self.join_type {
JoinType::Left
@@ -928,16 +928,14 @@ impl Stream for HashJoinStream {
self.join_type != JoinType::Semi,
);
if let Ok(ref batch) = result {
- self.metrics.input_batches.add(1);
- self.metrics.input_rows.add(batch.num_rows());
+ self.join_metrics.input_batches.add(1);
+ self.join_metrics.input_rows.add(batch.num_rows());
if let Ok(ref batch) = result {
- self.metrics
- .join_time
- .add(start.elapsed().as_millis() as usize);
- self.metrics.output_batches.add(1);
- self.metrics.output_rows.add(batch.num_rows());
+ self.join_metrics.output_batches.add(1);
+ self.join_metrics.output_rows.add(batch.num_rows());
}
}
+ timer.done();
self.is_exhausted = true;
return Some(result);
}
diff --git a/datafusion/src/physical_plan/metrics/builder.rs b/datafusion/src/physical_plan/metrics/builder.rs
new file mode 100644
index 0000000..34392c7
--- /dev/null
+++ b/datafusion/src/physical_plan/metrics/builder.rs
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Builder for creating arbitrary metrics
+
+use std::{borrow::Cow, sync::Arc};
+
+use super::{Count, ExecutionPlanMetricsSet, Label, Metric, MetricValue, Time};
+
+/// Structure for constructing metrics, counters, timers, etc.
+///
+/// Note the use of `Cow<..>` is to avoid allocations in the common
+/// case of constant strings
+///
+/// ```rust
+/// use datafusion::physical_plan::metrics::*;
+///
+/// let metrics = ExecutionPlanMetricsSet::new();
+/// let partition = 1;
+///
+/// // Create the standard output_rows metric
+/// let output_rows = MetricBuilder::new(&metrics).output_rows(partition);
+///
+/// // Create a operator specific counter with some labels
+/// let num_bytes = MetricBuilder::new(&metrics)
+/// .with_new_label("filename", "my_awesome_file.parquet")
+/// .counter("num_bytes", partition);
+///
+/// ```
+pub struct MetricBuilder<'a> {
+ /// Location that the metric created by this builder will be added do
+ metrics: &'a ExecutionPlanMetricsSet,
+
+ /// optional partition number
+ partition: Option<usize>,
+
+ /// arbitrary name=value pairs identifiying this metric
+ labels: Vec<Label>,
+}
+
+impl<'a> MetricBuilder<'a> {
+ /// Create a new `MetricBuilder` that will register the result of `build()` with the `metrics`
+ pub fn new(metrics: &'a ExecutionPlanMetricsSet) -> Self {
+ Self {
+ metrics,
+ partition: None,
+ labels: vec![],
+ }
+ }
+
+ /// Add a label to the metric being constructed
+ pub fn with_label(mut self, label: Label) -> Self {
+ self.labels.push(label);
+ self
+ }
+
+ /// Add a label to the metric being constructed
+ pub fn with_new_label(
+ self,
+ name: impl Into<Cow<'static, str>>,
+ value: impl Into<Cow<'static, str>>,
+ ) -> Self {
+ self.with_label(Label::new(name.into(), value.into()))
+ }
+
+ /// Set the partition of the metric being constructed
+ pub fn with_partition(mut self, partition: usize) -> Self {
+ self.partition = Some(partition);
+ self
+ }
+
+ /// Consume self and create a metric of the specified value
+ /// registered with the MetricsSet
+ pub fn build(self, value: MetricValue) {
+ let Self {
+ labels,
+ partition,
+ metrics,
+ } = self;
+ let metric = Arc::new(Metric::new_with_labels(value, partition, labels));
+ metrics.register(metric);
+ }
+
+ /// Consume self and create a new counter for recording output rows
+ pub fn output_rows(self, partition: usize) -> Count {
+ let count = Count::new();
+ self.with_partition(partition)
+ .build(MetricValue::OutputRows(count.clone()));
+ count
+ }
+
+ /// Consumes self and creates a new [`Count`] for recording some
+ /// arbitrary metric of an operator.
+ pub fn counter(
+ self,
+ counter_name: impl Into<Cow<'static, str>>,
+ partition: usize,
+ ) -> Count {
+ self.with_partition(partition).global_counter(counter_name)
+ }
+
+ /// Consumes self and creates a new [`Count`] for recording a
+ /// metric of an overall operator (not per partition)
+ pub fn global_counter(self, counter_name: impl Into<Cow<'static, str>>) -> Count {
+ let count = Count::new();
+ self.build(MetricValue::Count {
+ name: counter_name.into(),
+ count: count.clone(),
+ });
+ count
+ }
+
+ /// Consume self and create a new Timer for recording the overall cpu time
+ /// spent by an operator
+ pub fn cpu_time(self, partition: usize) -> Time {
+ let time = Time::new();
+ self.with_partition(partition)
+ .build(MetricValue::CPUTime(time.clone()));
+ time
+ }
+
+ /// Consumes self and creates a new Timer for recording some
+ /// subset of of an operators execution time.
+ pub fn subset_time(
+ self,
+ subset_name: impl Into<Cow<'static, str>>,
+ partition: usize,
+ ) -> Time {
+ let time = Time::new();
+ self.with_partition(partition).build(MetricValue::Time {
+ name: subset_name.into(),
+ time: time.clone(),
+ });
+ time
+ }
+}
diff --git a/datafusion/src/physical_plan/metrics/mod.rs b/datafusion/src/physical_plan/metrics/mod.rs
new file mode 100644
index 0000000..7dd92a4
--- /dev/null
+++ b/datafusion/src/physical_plan/metrics/mod.rs
@@ -0,0 +1,528 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metrics for recording information about execution
+
+mod builder;
+mod value;
+
+use std::{
+ borrow::Cow,
+ fmt::{Debug, Display},
+ sync::{Arc, Mutex},
+};
+
+use hashbrown::HashMap;
+
+// public exports
+pub use builder::MetricBuilder;
+pub use value::{Count, MetricValue, ScopedTimerGuard, Time};
+
+/// Something that tracks a value of interest (metric) of a DataFusion
+/// [`ExecutionPlan`] execution.
+///
+/// Typically [`Metric`]s are not created directly, but instead
+/// are created using [`MetricBuilder`] or methods on
+/// [`ExecutionPlanMetricsSet`].
+///
+/// ```
+/// use datafusion::physical_plan::metrics::*;
+///
+/// let metrics = ExecutionPlanMetricsSet::new();
+/// assert!(metrics.clone_inner().output_rows().is_none());
+///
+/// // Create a counter to increment using the MetricBuilder
+/// let partition = 1;
+/// let output_rows = MetricBuilder::new(&metrics)
+/// .output_rows(partition);
+///
+/// // Counter can be incremented
+/// output_rows.add(13);
+///
+/// // The value can be retrieved directly:
+/// assert_eq!(output_rows.value(), 13);
+///
+/// // As well as from the metrics set
+/// assert_eq!(metrics.clone_inner().output_rows(), Some(13));
+/// ```
+
+#[derive(Debug)]
+pub struct Metric {
+ /// The value the metric
+ value: MetricValue,
+
+ /// arbitrary name=value pairs identifiying this metric
+ labels: Vec<Label>,
+
+ /// To which partition of an operators output did this metric
+ /// apply? If None means all partitions.
+ partition: Option<usize>,
+}
+
+impl Display for Metric {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{}", self.value.name())?;
+
+ let mut iter = self
+ .partition
+ .iter()
+ .map(|partition| Label::new("partition", partition.to_string()))
+ .chain(self.labels().iter().cloned())
+ .peekable();
+
+ // print out the labels specially
+ if iter.peek().is_some() {
+ write!(f, "{{")?;
+
+ let mut is_first = true;
+ for i in iter {
+ if !is_first {
+ write!(f, ", ")?;
+ } else {
+ is_first = false;
+ }
+
+ write!(f, "{}", i)?;
+ }
+
+ write!(f, "}}")?;
+ }
+
+ // and now the value
+ write!(f, "={}", self.value)
+ }
+}
+
+impl Metric {
+ /// Create a new [`Metric`]. Consider using [`MetricBuilder`]
+ /// rather than this function directly.
+ pub fn new(value: MetricValue, partition: Option<usize>) -> Self {
+ Self {
+ value,
+ labels: vec![],
+ partition,
+ }
+ }
+
+ /// Create a new [`Metric`]. Consider using [`MetricBuilder`]
+ /// rather than this function directly.
+ pub fn new_with_labels(
+ value: MetricValue,
+ partition: Option<usize>,
+ labels: Vec<Label>,
+ ) -> Self {
+ Self {
+ value,
+ labels,
+ partition,
+ }
+ }
+
+ /// Add a new label to this metric
+ pub fn with(mut self, label: Label) -> Self {
+ self.labels.push(label);
+ self
+ }
+
+ /// What labels are present for this metric?
+ fn labels(&self) -> &[Label] {
+ &self.labels
+ }
+
+ /// return a reference to the value of this metric
+ pub fn value(&self) -> &MetricValue {
+ &self.value
+ }
+
+ /// return a mutable reference to the value of this metric
+ pub fn value_mut(&mut self) -> &mut MetricValue {
+ &mut self.value
+ }
+
+ /// return a reference to the partition
+ pub fn partition(&self) -> &Option<usize> {
+ &self.partition
+ }
+}
+
+/// A snapshot of the metrics for a particular operator (`dyn
+/// ExecutionPlan`).
+#[derive(Default, Debug, Clone)]
+pub struct MetricsSet {
+ metrics: Vec<Arc<Metric>>,
+}
+
+impl MetricsSet {
+ /// Create a new container of metrics
+ pub fn new() -> Self {
+ Default::default()
+ }
+
+ /// Add the specified metric
+ pub fn push(&mut self, metric: Arc<Metric>) {
+ self.metrics.push(metric)
+ }
+
+ /// Returns an interator across all metrics
+ pub fn iter(&self) -> impl Iterator<Item = &Arc<Metric>> {
+ self.metrics.iter()
+ }
+
+ /// convenience: return the number of rows produced, aggregated
+ /// across partitions or None if no metric is present
+ pub fn output_rows(&self) -> Option<usize> {
+ self.sum(|metric| matches!(metric.value(), MetricValue::OutputRows(_)))
+ .map(|v| v.as_usize())
+ }
+
+ /// convenience: return the amount of CPU time spent, aggregated
+ /// across partitions or None if no metric is present
+ pub fn cpu_time(&self) -> Option<usize> {
+ self.sum(|metric| matches!(metric.value(), MetricValue::CPUTime(_)))
+ .map(|v| v.as_usize())
+ }
+
+ /// Sums the values for metrics for which `f(metric)` returns
+ /// true, and returns the value. Returns None if no metrics match
+ /// the predicate.
+ pub fn sum<F>(&self, mut f: F) -> Option<MetricValue>
+ where
+ F: FnMut(&Metric) -> bool,
+ {
+ let mut iter = self
+ .metrics
+ .iter()
+ .filter(|metric| f(metric.as_ref()))
+ .peekable();
+
+ let mut accum = match iter.peek() {
+ None => {
+ return None;
+ }
+ Some(metric) => metric.value().new_empty(),
+ };
+
+ iter.for_each(|metric| accum.add(metric.value()));
+
+ Some(accum)
+ }
+
+ /// Returns returns a new derived `MetricsSet` where all metrics
+ /// that had the same name and partition=`Some(..)` have been
+ /// aggregated together. The resulting `MetricsSet` has all
+ /// metrics with `Partition=None`
+ pub fn aggregate_by_partition(&self) -> Self {
+ let mut map = HashMap::new();
+
+ // There are all sorts of ways to make this more efficient
+ for metric in &self.metrics {
+ let key = (metric.value.name(), metric.labels.clone());
+ map.entry(key)
+ .and_modify(|accum: &mut Metric| {
+ accum.value_mut().add(metric.value());
+ })
+ .or_insert_with(|| {
+ // accumulate with no partition
+ let partition = None;
+ let mut accum = Metric::new_with_labels(
+ metric.value().new_empty(),
+ partition,
+ metric.labels().to_vec(),
+ );
+ accum.value_mut().add(metric.value());
+ accum
+ });
+ }
+
+ let new_metrics = map
+ .into_iter()
+ .map(|(_k, v)| Arc::new(v))
+ .collect::<Vec<_>>();
+
+ Self {
+ metrics: new_metrics,
+ }
+ }
+}
+
+impl Display for MetricsSet {
+ /// format the MetricsSet as a single string
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let mut is_first = true;
+ for i in self.metrics.iter() {
+ if !is_first {
+ write!(f, ", ")?;
+ } else {
+ is_first = false;
+ }
+
+ write!(f, "{}", i)?;
+ }
+ Ok(())
+ }
+}
+
+/// A set of [`Metric`] for an individual "operator" (e.g. `&dyn
+/// ExecutionPlan`).
+///
+/// This structure is intended as a convenience for [`ExecutionPlan`]
+/// implementations so they can generate different streams for multiple
+/// partitions but easily report them together.
+///
+/// Each `clone()` of this structure will add metrics to the same
+/// underlying metrics set
+#[derive(Default, Debug, Clone)]
+pub struct ExecutionPlanMetricsSet {
+ inner: Arc<Mutex<MetricsSet>>,
+}
+
+impl ExecutionPlanMetricsSet {
+ /// Create a new empty shared metrics set
+ pub fn new() -> Self {
+ Self {
+ inner: Arc::new(Mutex::new(MetricsSet::new())),
+ }
+ }
+
+ /// Add the specified metric to the underlying metric set
+ pub fn register(&self, metric: Arc<Metric>) {
+ self.inner.lock().expect("not poisoned").push(metric)
+ }
+
+ /// Return a clone of the inner MetricsSet
+ pub fn clone_inner(&self) -> MetricsSet {
+ let guard = self.inner.lock().expect("not poisoned");
+ (*guard).clone()
+ }
+}
+
+/// name=value pairs identifiying a metric. This concept is called various things
+/// in various different systems:
+///
+/// "labels" in
+/// [prometheus](https://prometheus.io/docs/concepts/data_model/) and
+/// "tags" in
+/// [InfluxDB](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/)
+/// , "attributes" in [open
+/// telemetry](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/datamodel.md],
+/// etc.
+///
+/// As the name and value are expected to mostly be constant strings,
+/// use a `Cow` to avoid copying / allocations in this common case.
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct Label {
+ name: Cow<'static, str>,
+ value: Cow<'static, str>,
+}
+
+impl Label {
+ /// Create a new Label
+ pub fn new(
+ name: impl Into<Cow<'static, str>>,
+ value: impl Into<Cow<'static, str>>,
+ ) -> Self {
+ let name = name.into();
+ let value = value.into();
+ Self { name, value }
+ }
+}
+
+impl Display for Label {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{}={}", self.name, self.value)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::time::Duration;
+
+ use super::*;
+
+ #[test]
+ fn test_display_no_labels_no_partition() {
+ let count = Count::new();
+ count.add(33);
+ let value = MetricValue::OutputRows(count);
+ let partition = None;
+ let metric = Metric::new(value, partition);
+
+ assert_eq!("output_rows=33", metric.to_string())
+ }
+
+ #[test]
+ fn test_display_no_labels_with_partition() {
+ let count = Count::new();
+ count.add(44);
+ let value = MetricValue::OutputRows(count);
+ let partition = Some(1);
+ let metric = Metric::new(value, partition);
+
+ assert_eq!("output_rows{partition=1}=44", metric.to_string())
+ }
+
+ #[test]
+ fn test_display_labels_no_partition() {
+ let count = Count::new();
+ count.add(55);
+ let value = MetricValue::OutputRows(count);
+ let partition = None;
+ let label = Label::new("foo", "bar");
+ let metric = Metric::new_with_labels(value, partition, vec![label]);
+
+ assert_eq!("output_rows{foo=bar}=55", metric.to_string())
+ }
+
+ #[test]
+ fn test_display_labels_and_partition() {
+ let count = Count::new();
+ count.add(66);
+ let value = MetricValue::OutputRows(count);
+ let partition = Some(2);
+ let label = Label::new("foo", "bar");
+ let metric = Metric::new_with_labels(value, partition, vec![label]);
+
+ assert_eq!("output_rows{partition=2, foo=bar}=66", metric.to_string())
+ }
+
+ #[test]
+ fn test_output_rows() {
+ let metrics = ExecutionPlanMetricsSet::new();
+ assert!(metrics.clone_inner().output_rows().is_none());
+
+ let partition = 1;
+ let output_rows = MetricBuilder::new(&metrics).output_rows(partition);
+ output_rows.add(13);
+
+ let output_rows = MetricBuilder::new(&metrics).output_rows(partition + 1);
+ output_rows.add(7);
+ assert_eq!(metrics.clone_inner().output_rows().unwrap(), 20);
+ }
+
+ #[test]
+ fn test_cpu_time() {
+ let metrics = ExecutionPlanMetricsSet::new();
+ assert!(metrics.clone_inner().cpu_time().is_none());
+
+ let partition = 1;
+ let cpu_time = MetricBuilder::new(&metrics).cpu_time(partition);
+ cpu_time.add_duration(Duration::from_nanos(1234));
+
+ let cpu_time = MetricBuilder::new(&metrics).cpu_time(partition + 1);
+ cpu_time.add_duration(Duration::from_nanos(6));
+ assert_eq!(metrics.clone_inner().cpu_time().unwrap(), 1240);
+ }
+
+ #[test]
+ fn test_sum() {
+ let metrics = ExecutionPlanMetricsSet::new();
+
+ let count1 = MetricBuilder::new(&metrics)
+ .with_new_label("foo", "bar")
+ .counter("my_counter", 1);
+ count1.add(1);
+
+ let count2 = MetricBuilder::new(&metrics).counter("my_counter", 2);
+ count2.add(2);
+
+ let metrics = metrics.clone_inner();
+ assert!(metrics.sum(|_| false).is_none());
+
+ let expected_count = Count::new();
+ expected_count.add(3);
+ let expected_sum = MetricValue::Count {
+ name: "my_counter".into(),
+ count: expected_count,
+ };
+
+ assert_eq!(metrics.sum(|_| true), Some(expected_sum));
+ }
+
+ #[test]
+ #[should_panic(expected = "Mismatched metric types. Can not aggregate Count")]
+ fn test_bad_sum() {
+ // can not add different kinds of metrics
+ let metrics = ExecutionPlanMetricsSet::new();
+
+ let count = MetricBuilder::new(&metrics).counter("my_metric", 1);
+ count.add(1);
+
+ let time = MetricBuilder::new(&metrics).subset_time("my_metric", 1);
+ time.add_duration(Duration::from_nanos(10));
+
+ // expect that this will error out
+ metrics.clone_inner().sum(|_| true);
+ }
+
+ #[test]
+ fn test_aggregate_partition() {
+ let metrics = ExecutionPlanMetricsSet::new();
+
+ // Note cpu_time1 has labels so it is not aggregated with 2 and 3
+ let cpu_time1 = MetricBuilder::new(&metrics)
+ .with_new_label("foo", "bar")
+ .cpu_time(1);
+ cpu_time1.add_duration(Duration::from_nanos(12));
+
+ let cpu_time2 = MetricBuilder::new(&metrics).cpu_time(2);
+ cpu_time2.add_duration(Duration::from_nanos(34));
+
+ let cpu_time3 = MetricBuilder::new(&metrics).cpu_time(4);
+ cpu_time3.add_duration(Duration::from_nanos(56));
+
+ let output_rows = MetricBuilder::new(&metrics).output_rows(1); // output rows
+ output_rows.add(56);
+
+ let aggregated = metrics.clone_inner().aggregate_by_partition();
+
+ // cpu time should be aggregated:
+ let cpu_times = aggregated
+ .iter()
+ .filter(|metric| {
+ matches!(metric.value(), MetricValue::CPUTime(_))
+ && metric.labels().is_empty()
+ })
+ .collect::<Vec<_>>();
+ assert_eq!(cpu_times.len(), 1);
+ assert_eq!(cpu_times[0].value().as_usize(), 34 + 56);
+ assert!(cpu_times[0].partition().is_none());
+
+ // output rows should
+ let output_rows = aggregated
+ .iter()
+ .filter(|metric| matches!(metric.value(), MetricValue::OutputRows(_)))
+ .collect::<Vec<_>>();
+ assert_eq!(output_rows.len(), 1);
+ assert_eq!(output_rows[0].value().as_usize(), 56);
+ assert!(output_rows[0].partition.is_none())
+ }
+
+ #[test]
+ #[should_panic(expected = "Mismatched metric types. Can not aggregate Count")]
+ fn test_aggregate_partition_bad_sum() {
+ let metrics = ExecutionPlanMetricsSet::new();
+
+ let count = MetricBuilder::new(&metrics).counter("my_metric", 1);
+ count.add(1);
+
+ let time = MetricBuilder::new(&metrics).subset_time("my_metric", 1);
+ time.add_duration(Duration::from_nanos(10));
+
+ // can't aggregate time and count -- expect a panic
+ metrics.clone_inner().aggregate_by_partition();
+ }
+}
diff --git a/datafusion/src/physical_plan/metrics/value.rs b/datafusion/src/physical_plan/metrics/value.rs
new file mode 100644
index 0000000..61a7c2b
--- /dev/null
+++ b/datafusion/src/physical_plan/metrics/value.rs
@@ -0,0 +1,261 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Value representation of metrics
+
+use std::{
+ borrow::{Borrow, Cow},
+ sync::{
+ atomic::{AtomicUsize, Ordering},
+ Arc,
+ },
+ time::{Duration, Instant},
+};
+
+/// A counter to record things such as number of input or output rows
+///
+/// Note `clone`ing counters update the same underlying metrics
+#[derive(Debug, Clone)]
+pub struct Count {
+ /// value of the metric counter
+ value: std::sync::Arc<AtomicUsize>,
+}
+
+impl PartialEq for Count {
+ fn eq(&self, other: &Self) -> bool {
+ self.value().eq(&other.value())
+ }
+}
+
+impl Count {
+ /// create a new counter
+ pub fn new() -> Self {
+ Self {
+ value: Arc::new(AtomicUsize::new(0)),
+ }
+ }
+
+ /// Add `n` to the metric's value
+ pub fn add(&self, n: usize) {
+ // relaxed ordering for operations on `value` poses no issues
+ // we're purely using atomic ops with no associated memory ops
+ self.value.fetch_add(n, Ordering::Relaxed);
+ }
+
+ /// Get the current value
+ pub fn value(&self) -> usize {
+ self.value.load(Ordering::Relaxed)
+ }
+}
+
+/// Measure a potentially non contiguous duration of time
+#[derive(Debug, Clone)]
+pub struct Time {
+ /// elapsed time, in nanoseconds
+ nanos: Arc<AtomicUsize>,
+}
+
+impl PartialEq for Time {
+ fn eq(&self, other: &Self) -> bool {
+ self.value().eq(&other.value())
+ }
+}
+
+impl Time {
+ /// Create a new [`Time`] wrapper suitable for recording elapsed
+ /// times for operations.
+ pub fn new() -> Self {
+ Self {
+ nanos: Arc::new(AtomicUsize::new(0)),
+ }
+ }
+
+ /// Add elapsed nanoseconds since `start`to self
+ pub fn add_elapsed(&self, start: Instant) {
+ self.add_duration(start.elapsed());
+ }
+
+ /// Add duration of time to self
+ pub fn add_duration(&self, duration: Duration) {
+ let more_nanos = duration.as_nanos() as usize;
+ self.nanos.fetch_add(more_nanos, Ordering::Relaxed);
+ }
+
+ /// Add the number of nanoseconds of other `Time` to self
+ pub fn add(&self, other: &Time) {
+ self.nanos.fetch_add(other.value(), Ordering::Relaxed);
+ }
+
+ /// return a scoped guard that adds the amount of time elapsed
+ /// between its creation and its drop or call to `stop` to the
+ /// underlying metric.
+ pub fn timer(&self) -> ScopedTimerGuard<'_> {
+ ScopedTimerGuard {
+ inner: self,
+ start: Some(Instant::now()),
+ }
+ }
+
+ /// Get the number of nanoseconds record by this Time metric
+ pub fn value(&self) -> usize {
+ self.nanos.load(Ordering::Relaxed)
+ }
+}
+
+/// RAAI structure that adds all time between its construction and
+/// destruction to the CPU time or the first call to `stop` whichever
+/// comes first
+pub struct ScopedTimerGuard<'a> {
+ inner: &'a Time,
+ start: Option<Instant>,
+}
+
+impl<'a> ScopedTimerGuard<'a> {
+ /// Stop the timer timing and record the time taken
+ pub fn stop(&mut self) {
+ if let Some(start) = self.start.take() {
+ self.inner.add_elapsed(start)
+ }
+ }
+
+ /// Stop the timer, record the time taken and consume self
+ pub fn done(mut self) {
+ self.stop()
+ }
+}
+
+impl<'a> Drop for ScopedTimerGuard<'a> {
+ fn drop(&mut self) {
+ self.stop()
+ }
+}
+
+/// Possible values for a metric.
+///
+/// Among other differences, the metric types have different ways to
+/// logically interpret their underlying values and some metrics are
+/// so common they are given special treatment.
+#[derive(Debug, Clone, PartialEq)]
+pub enum MetricValue {
+ /// Number of output rows produced: "output_rows" metric
+ OutputRows(Count),
+ /// CPU time: the "cpu_time" metric
+ CPUTime(Time),
+ /// Operator defined count.
+ Count {
+ /// The provided name of this metric
+ name: Cow<'static, str>,
+ /// The value of the metric
+ count: Count,
+ },
+ /// Operator defined time
+ Time {
+ /// The provided name of this metric
+ name: Cow<'static, str>,
+ /// The value of the metric
+ time: Time,
+ },
+ // TODO timestamp, etc
+ // https://github.com/apache/arrow-datafusion/issues/866
+}
+
+impl MetricValue {
+ /// Return the name of this SQL metric
+ pub fn name(&self) -> &str {
+ match self {
+ Self::OutputRows(_) => "output_rows",
+ Self::CPUTime(_) => "cpu_time",
+ Self::Count { name, .. } => name.borrow(),
+ Self::Time { name, .. } => name.borrow(),
+ }
+ }
+
+ /// Return the value of the metric as a usize value
+ pub fn as_usize(&self) -> usize {
+ match self {
+ Self::OutputRows(count) => count.value(),
+ Self::CPUTime(time) => time.value(),
+ Self::Count { count, .. } => count.value(),
+ Self::Time { time, .. } => time.value(),
+ }
+ }
+
+ /// create a new MetricValue with the same type as `self` suitable
+ /// for accumulating
+ pub fn new_empty(&self) -> Self {
+ match self {
+ Self::OutputRows(_) => Self::OutputRows(Count::new()),
+ Self::CPUTime(_) => Self::CPUTime(Time::new()),
+ Self::Count { name, .. } => Self::Count {
+ name: name.clone(),
+ count: Count::new(),
+ },
+ Self::Time { name, .. } => Self::Time {
+ name: name.clone(),
+ time: Time::new(),
+ },
+ }
+ }
+
+ /// Add the value of other to `self`. panic's if the type is mismatched or
+ /// aggregating does not make sense for this value
+ ///
+ /// Note this is purposely marked `mut` (even though atomics are
+ /// used) so Rust's type system can be used to ensure the
+ /// appropriate API access. `MetricValues` should be modified
+ /// using the original [`Count`] or [`Time`] they were created
+ /// from.
+ pub fn add(&mut self, other: &Self) {
+ match (self, other) {
+ (Self::OutputRows(count), Self::OutputRows(other_count))
+ | (
+ Self::Count { count, .. },
+ Self::Count {
+ count: other_count, ..
+ },
+ ) => count.add(other_count.value()),
+ (Self::CPUTime(time), Self::CPUTime(other_time))
+ | (
+ Self::Time { time, .. },
+ Self::Time {
+ time: other_time, ..
+ },
+ ) => time.add(other_time),
+ m @ (_, _) => {
+ panic!(
+ "Mismatched metric types. Can not aggregate {:?} with value {:?}",
+ m.0, m.1
+ )
+ }
+ }
+ }
+}
+
+impl std::fmt::Display for MetricValue {
+ /// Prints the value of this metric
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Self::OutputRows(count) | Self::Count { count, .. } => {
+ write!(f, "{}", count.value())
+ }
+ Self::CPUTime(time) | Self::Time { time, .. } => {
+ let duration = std::time::Duration::from_nanos(time.value() as u64);
+ write!(f, "{:?}", duration)
+ }
+ }
+ }
+}
diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs
index 8f7db72..82c297c 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -17,14 +17,8 @@
//! Traits for physical query plan, supporting parallel execution for partitioned relations.
-use std::fmt;
-use std::fmt::{Debug, Display};
-use std::ops::Range;
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::Arc;
-use std::task::{Context, Poll};
-use std::{any::Any, pin::Pin};
-
+pub use self::metrics::Metric;
+use self::metrics::MetricsSet;
use self::{
coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan,
};
@@ -42,7 +36,12 @@ use arrow::{array::ArrayRef, datatypes::Field};
use async_trait::async_trait;
pub use display::DisplayFormatType;
use futures::stream::Stream;
-use hashbrown::HashMap;
+use std::fmt;
+use std::fmt::{Debug, Display};
+use std::ops::Range;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{any::Any, pin::Pin};
/// Trait for types that stream [arrow::record_batch::RecordBatch]
pub trait RecordBatchStream: Stream<Item = ArrowResult<RecordBatch>> {
@@ -87,72 +86,6 @@ impl Stream for EmptyRecordBatchStream {
}
}
-/// SQL metric type
-#[derive(Debug, Clone)]
-pub enum MetricType {
- /// Simple counter
- Counter,
- /// Wall clock time in nanoseconds
- TimeNanos,
-}
-
-/// SQL metric such as counter (number of input or output rows) or timing information about
-/// a physical operator.
-#[derive(Debug)]
-pub struct SQLMetric {
- /// Metric value
- value: AtomicUsize,
- /// Metric type
- metric_type: MetricType,
-}
-
-impl Clone for SQLMetric {
- fn clone(&self) -> Self {
- Self {
- value: AtomicUsize::new(self.value.load(Ordering::Relaxed)),
- metric_type: self.metric_type.clone(),
- }
- }
-}
-
-impl SQLMetric {
- // relaxed ordering for operations on `value` poses no issues
- // we're purely using atomic ops with no associated memory ops
-
- /// Create a new metric for tracking a counter
- pub fn counter() -> Arc<SQLMetric> {
- Arc::new(SQLMetric::new(MetricType::Counter))
- }
-
- /// Create a new metric for tracking time in nanoseconds
- pub fn time_nanos() -> Arc<SQLMetric> {
- Arc::new(SQLMetric::new(MetricType::TimeNanos))
- }
-
- /// Create a new SQLMetric
- pub fn new(metric_type: MetricType) -> Self {
- Self {
- value: AtomicUsize::new(0),
- metric_type,
- }
- }
-
- /// Add to the value
- pub fn add(&self, n: usize) {
- self.value.fetch_add(n, Ordering::Relaxed);
- }
-
- /// Add elapsed nanoseconds since `start`to self
- pub fn add_elapsed(&self, start: std::time::Instant) {
- self.add(start.elapsed().as_nanos() as usize)
- }
-
- /// Get the current value
- pub fn value(&self) -> usize {
- self.value.load(Ordering::Relaxed)
- }
-}
-
/// Physical planner interface
pub use self::planner::PhysicalPlanner;
@@ -193,9 +126,19 @@ pub trait ExecutionPlan: Debug + Send + Sync {
/// creates an iterator
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream>;
- /// Return a snapshot of the metrics collected during execution
- fn metrics(&self) -> HashMap<String, SQLMetric> {
- HashMap::new()
+ /// Return a snapshot of the set of [`Metric`]s for this
+ /// [`ExecutionPlan`].
+ ///
+ /// While the values of the metrics in the returned
+ /// [`MetricsSet`]s may change as execution progresses, the
+ /// specific metrics will not.
+ ///
+ /// Once `self.execute()` has returned (technically the future is
+ /// resolved) for all available partitions, the set of metrics
+ /// should be complete. If this function is called prior to
+ /// `execute()` new metrics may appear in subsequent calls.
+ fn metrics(&self) -> Option<MetricsSet> {
+ None
}
/// Format this `ExecutionPlan` to `f` in the specified type.
@@ -330,20 +273,6 @@ pub fn visit_execution_plan<V: ExecutionPlanVisitor>(
Ok(())
}
-/// Recursively gateher all execution metrics from this plan and all of its input plans
-pub fn plan_metrics(plan: Arc<dyn ExecutionPlan>) -> HashMap<String, SQLMetric> {
- fn get_metrics_inner(
- plan: &dyn ExecutionPlan,
- mut metrics: HashMap<String, SQLMetric>,
- ) -> HashMap<String, SQLMetric> {
- metrics.extend(plan.metrics().into_iter());
- plan.children().into_iter().fold(metrics, |metrics, child| {
- get_metrics_inner(child.as_ref(), metrics)
- })
- }
- get_metrics_inner(plan.as_ref(), HashMap::new())
-}
-
/// Execute the [ExecutionPlan] and collect the results in memory
pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
let stream = execute_stream(plan).await?;
@@ -663,6 +592,7 @@ pub mod json;
pub mod limit;
pub mod math_expressions;
pub mod memory;
+pub mod metrics;
pub mod parquet;
pub mod planner;
pub mod projection;
diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs
index ff8bb5b..f85495c 100644
--- a/datafusion/src/physical_plan/parquet.rs
+++ b/datafusion/src/physical_plan/parquet.rs
@@ -38,7 +38,6 @@ use arrow::{
error::{ArrowError, Result as ArrowResult},
record_batch::RecordBatch,
};
-use hashbrown::HashMap;
use log::debug;
use parquet::file::{
metadata::RowGroupMetaData,
@@ -57,8 +56,8 @@ use tokio::{
use crate::datasource::datasource::{ColumnStatistics, Statistics};
use async_trait::async_trait;
+use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
use super::stream::RecordBatchReceiverStream;
-use super::SQLMetric;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::Accumulator;
@@ -75,8 +74,8 @@ pub struct ParquetExec {
batch_size: usize,
/// Statistics for the data set (sum of statistics for all partitions)
statistics: Statistics,
- /// metrics for the overall execution
- metrics: ParquetExecMetrics,
+ /// Execution metrics
+ metrics: ExecutionPlanMetricsSet,
/// Optional predicate builder
predicate_builder: Option<PruningPredicate>,
/// Optional limit of the number of rows
@@ -99,23 +98,16 @@ pub struct ParquetPartition {
/// Statistics for this partition
pub statistics: Statistics,
/// Execution metrics
- metrics: ParquetPartitionMetrics,
-}
-
-/// Stores metrics about the overall parquet execution
-#[derive(Debug, Clone)]
-pub struct ParquetExecMetrics {
- /// Numer of times the pruning predicate could not be created
- pub predicate_creation_errors: Arc<SQLMetric>,
+ metrics: ExecutionPlanMetricsSet,
}
-/// Stores metrics about the parquet execution for a particular ParquetPartition
+/// Stores metrics about the parquet execution for a particular parquet file
#[derive(Debug, Clone)]
-struct ParquetPartitionMetrics {
+struct ParquetFileMetrics {
/// Numer of times the predicate could not be evaluated
- pub predicate_evaluation_errors: Arc<SQLMetric>,
+ pub predicate_evaluation_errors: metrics::Count,
/// Number of row groups pruned using
- pub row_groups_pruned: Arc<SQLMetric>,
+ pub row_groups_pruned: metrics::Count,
}
impl ParquetExec {
@@ -167,6 +159,7 @@ impl ParquetExec {
filenames, projection, predicate, limit);
// build a list of Parquet partitions with statistics and gather all unique schemas
// used in this data set
+ let metrics = ExecutionPlanMetricsSet::new();
let mut schemas: Vec<Schema> = vec![];
let mut partitions = Vec::with_capacity(max_concurrency);
let filenames: Vec<String> = filenames.iter().map(|s| s.to_string()).collect();
@@ -393,7 +386,11 @@ impl ParquetExec {
};
// remove files that are not needed in case of limit
filenames.truncate(total_files);
- partitions.push(ParquetPartition::new(filenames, statistics));
+ partitions.push(ParquetPartition::new(
+ filenames,
+ statistics,
+ metrics.clone(),
+ ));
if limit_exhausted {
break;
}
@@ -410,7 +407,10 @@ impl ParquetExec {
)));
}
let schema = Arc::new(schemas.pop().unwrap());
- let metrics = ParquetExecMetrics::new();
+
+ let metrics = ExecutionPlanMetricsSet::new();
+ let predicate_creation_errors =
+ MetricBuilder::new(&metrics).global_counter("num_predicate_creation_errors");
let predicate_builder = predicate.and_then(|predicate_expr| {
match PruningPredicate::try_new(&predicate_expr, schema.clone()) {
@@ -420,7 +420,7 @@ impl ParquetExec {
"Could not create pruning predicate for {:?}: {}",
predicate_expr, e
);
- metrics.predicate_creation_errors.add(1);
+ predicate_creation_errors.add(1);
None
}
}
@@ -442,7 +442,7 @@ impl ParquetExec {
partitions: Vec<ParquetPartition>,
schema: SchemaRef,
projection: Option<Vec<usize>>,
- metrics: ParquetExecMetrics,
+ metrics: ExecutionPlanMetricsSet,
predicate_builder: Option<PruningPredicate>,
batch_size: usize,
limit: Option<usize>,
@@ -582,11 +582,15 @@ impl ParquetExec {
impl ParquetPartition {
/// Create a new parquet partition
- pub fn new(filenames: Vec<String>, statistics: Statistics) -> Self {
+ pub fn new(
+ filenames: Vec<String>,
+ statistics: Statistics,
+ metrics: ExecutionPlanMetricsSet,
+ ) -> Self {
Self {
filenames,
statistics,
- metrics: ParquetPartitionMetrics::new(),
+ metrics,
}
}
@@ -601,21 +605,24 @@ impl ParquetPartition {
}
}
-impl ParquetExecMetrics {
+impl ParquetFileMetrics {
/// Create new metrics
- pub fn new() -> Self {
- Self {
- predicate_creation_errors: SQLMetric::counter(),
- }
- }
-}
+ pub fn new(
+ partition: usize,
+ filename: &str,
+ metrics: &ExecutionPlanMetricsSet,
+ ) -> Self {
+ let predicate_evaluation_errors = MetricBuilder::new(metrics)
+ .with_new_label("filename", filename.to_string())
+ .counter("predicate_evaluation_errors", partition);
+
+ let row_groups_pruned = MetricBuilder::new(metrics)
+ .with_new_label("filename", filename.to_string())
+ .counter("row_groups_pruned", partition);
-impl ParquetPartitionMetrics {
- /// Create new metrics
- pub fn new() -> Self {
Self {
- predicate_evaluation_errors: SQLMetric::counter(),
- row_groups_pruned: SQLMetric::counter(),
+ predicate_evaluation_errors,
+ row_groups_pruned,
}
}
}
@@ -663,9 +670,9 @@ impl ExecutionPlan for ParquetExec {
Receiver<ArrowResult<RecordBatch>>,
) = channel(2);
- let partition = &self.partitions[partition];
- let filenames = partition.filenames.clone();
- let metrics = partition.metrics.clone();
+ let parquet_partition = &self.partitions[partition];
+ let filenames = parquet_partition.filenames.clone();
+ let metrics = self.metrics.clone();
let projection = self.projection.clone();
let predicate_builder = self.predicate_builder.clone();
let batch_size = self.batch_size;
@@ -673,6 +680,7 @@ impl ExecutionPlan for ParquetExec {
task::spawn_blocking(move || {
if let Err(e) = read_files(
+ partition,
&filenames,
metrics,
&projection,
@@ -714,29 +722,8 @@ impl ExecutionPlan for ParquetExec {
}
}
- fn metrics(&self) -> HashMap<String, SQLMetric> {
- self.partitions
- .iter()
- .flat_map(|p| {
- vec![
- (
- format!(
- "numPredicateEvaluationErrors for {}",
- p.filenames.join(",")
- ),
- p.metrics.predicate_evaluation_errors.as_ref().clone(),
- ),
- (
- format!("numRowGroupsPruned for {}", p.filenames.join(",")),
- p.metrics.row_groups_pruned.as_ref().clone(),
- ),
- ]
- })
- .chain(std::iter::once((
- "numPredicateCreationErrors".to_string(),
- self.metrics.predicate_creation_errors.as_ref().clone(),
- )))
- .collect()
+ fn metrics(&self) -> Option<MetricsSet> {
+ Some(self.metrics.clone_inner())
}
}
@@ -837,7 +824,7 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
fn build_row_group_predicate(
predicate_builder: &PruningPredicate,
- metrics: ParquetPartitionMetrics,
+ metrics: ParquetFileMetrics,
row_group_metadata: &[RowGroupMetaData],
) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
let parquet_schema = predicate_builder.schema().as_ref();
@@ -865,9 +852,11 @@ fn build_row_group_predicate(
}
}
+#[allow(clippy::too_many_arguments)]
fn read_files(
+ partition: usize,
filenames: &[String],
- metrics: ParquetPartitionMetrics,
+ metrics: ExecutionPlanMetricsSet,
projection: &[usize],
predicate_builder: &Option<PruningPredicate>,
batch_size: usize,
@@ -876,12 +865,13 @@ fn read_files(
) -> Result<()> {
let mut total_rows = 0;
'outer: for filename in filenames {
+ let file_metrics = ParquetFileMetrics::new(partition, filename, &metrics);
let file = File::open(&filename)?;
let mut file_reader = SerializedFileReader::new(file)?;
if let Some(predicate_builder) = predicate_builder {
let row_group_predicate = build_row_group_predicate(
predicate_builder,
- metrics.clone(),
+ file_metrics,
file_reader.metadata().row_groups(),
);
file_reader.filter_row_groups(&row_group_predicate);
@@ -1016,6 +1006,11 @@ mod tests {
Ok(())
}
+ fn parquet_file_metrics() -> ParquetFileMetrics {
+ let metrics = Arc::new(ExecutionPlanMetricsSet::new());
+ ParquetFileMetrics::new(0, "file.parquet", &metrics)
+ }
+
#[test]
fn row_group_predicate_builder_simple_expr() -> Result<()> {
use crate::logical_plan::{col, lit};
@@ -1036,7 +1031,7 @@ mod tests {
let row_group_metadata = vec![rgm1, rgm2];
let row_group_predicate = build_row_group_predicate(
&predicate_builder,
- ParquetPartitionMetrics::new(),
+ parquet_file_metrics(),
&row_group_metadata,
);
let row_group_filter = row_group_metadata
@@ -1069,7 +1064,7 @@ mod tests {
let row_group_metadata = vec![rgm1, rgm2];
let row_group_predicate = build_row_group_predicate(
&predicate_builder,
- ParquetPartitionMetrics::new(),
+ parquet_file_metrics(),
&row_group_metadata,
);
let row_group_filter = row_group_metadata
@@ -1117,7 +1112,7 @@ mod tests {
let row_group_metadata = vec![rgm1, rgm2];
let row_group_predicate = build_row_group_predicate(
&predicate_builder,
- ParquetPartitionMetrics::new(),
+ parquet_file_metrics(),
&row_group_metadata,
);
let row_group_filter = row_group_metadata
@@ -1135,7 +1130,7 @@ mod tests {
let predicate_builder = PruningPredicate::try_new(&expr, schema)?;
let row_group_predicate = build_row_group_predicate(
&predicate_builder,
- ParquetPartitionMetrics::new(),
+ parquet_file_metrics(),
&row_group_metadata,
);
let row_group_filter = row_group_metadata
@@ -1182,7 +1177,7 @@ mod tests {
let row_group_metadata = vec![rgm1, rgm2];
let row_group_predicate = build_row_group_predicate(
&predicate_builder,
- ParquetPartitionMetrics::new(),
+ parquet_file_metrics(),
&row_group_metadata,
);
let row_group_filter = row_group_metadata
diff --git a/datafusion/src/physical_plan/repartition.rs b/datafusion/src/physical_plan/repartition.rs
index eb3fe55..8ba9a4f 100644
--- a/datafusion/src/physical_plan/repartition.rs
+++ b/datafusion/src/physical_plan/repartition.rs
@@ -21,17 +21,17 @@
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
-use std::time::Instant;
use std::{any::Any, vec};
use crate::error::{DataFusionError, Result};
use crate::physical_plan::hash_utils::create_hashes;
-use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning, SQLMetric};
+use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
use arrow::record_batch::RecordBatch;
use arrow::{array::Array, error::Result as ArrowResult};
use arrow::{compute::take, datatypes::SchemaRef};
use tokio_stream::wrappers::UnboundedReceiverStream;
+use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
use super::{RecordBatchStream, SendableRecordBatchStream};
use async_trait::async_trait;
@@ -63,38 +63,48 @@ pub struct RepartitionExec {
>,
/// Execution metrics
- metrics: RepartitionMetrics,
+ metrics: ExecutionPlanMetricsSet,
}
#[derive(Debug, Clone)]
struct RepartitionMetrics {
/// Time in nanos to execute child operator and fetch batches
- fetch_nanos: Arc<SQLMetric>,
+ fetch_time: metrics::Time,
/// Time in nanos to perform repartitioning
- repart_nanos: Arc<SQLMetric>,
+ repart_time: metrics::Time,
/// Time in nanos for sending resulting batches to channels
- send_nanos: Arc<SQLMetric>,
+ send_time: metrics::Time,
}
impl RepartitionMetrics {
- fn new() -> Self {
+ pub fn new(
+ output_partition: usize,
+ input_partition: usize,
+ metrics: &ExecutionPlanMetricsSet,
+ ) -> Self {
+ let label = metrics::Label::new("inputPartition", input_partition.to_string());
+
+ // Time in nanos to execute child operator and fetch batches
+ let fetch_time = MetricBuilder::new(metrics)
+ .with_label(label.clone())
+ .subset_time("fetch_time", output_partition);
+
+ // Time in nanos to perform repartitioning
+ let repart_time = MetricBuilder::new(metrics)
+ .with_label(label.clone())
+ .subset_time("repart_time", output_partition);
+
+ // Time in nanos for sending resulting batches to channels
+ let send_time = MetricBuilder::new(metrics)
+ .with_label(label)
+ .subset_time("send_time", output_partition);
+
Self {
- fetch_nanos: SQLMetric::time_nanos(),
- repart_nanos: SQLMetric::time_nanos(),
- send_nanos: SQLMetric::time_nanos(),
+ fetch_time,
+ repart_time,
+ send_time,
}
}
- /// Convert into the external metrics form
- fn to_hashmap(&self) -> HashMap<String, SQLMetric> {
- let mut metrics = HashMap::new();
- metrics.insert("fetchTime".to_owned(), self.fetch_nanos.as_ref().clone());
- metrics.insert(
- "repartitionTime".to_owned(),
- self.repart_nanos.as_ref().clone(),
- );
- metrics.insert("sendTime".to_owned(), self.send_nanos.as_ref().clone());
- metrics
- }
}
impl RepartitionExec {
@@ -175,6 +185,8 @@ impl ExecutionPlan for RepartitionExec {
.map(|(partition, (tx, _rx))| (*partition, tx.clone()))
.collect();
+ let r_metrics = RepartitionMetrics::new(i, partition, &self.metrics);
+
let input_task: JoinHandle<Result<()>> =
tokio::spawn(Self::pull_from_input(
random.clone(),
@@ -182,7 +194,7 @@ impl ExecutionPlan for RepartitionExec {
i,
txs.clone(),
self.partitioning.clone(),
- self.metrics.clone(),
+ r_metrics,
));
// In a separate task, wait for each input to be done
@@ -201,8 +213,8 @@ impl ExecutionPlan for RepartitionExec {
}))
}
- fn metrics(&self) -> HashMap<String, SQLMetric> {
- self.metrics.to_hashmap()
+ fn metrics(&self) -> Option<MetricsSet> {
+ Some(self.metrics.clone_inner())
}
fn fmt_as(
@@ -228,7 +240,7 @@ impl RepartitionExec {
input,
partitioning,
channels: Arc::new(Mutex::new(HashMap::new())),
- metrics: RepartitionMetrics::new(),
+ metrics: ExecutionPlanMetricsSet::new(),
})
}
@@ -244,14 +256,14 @@ impl RepartitionExec {
i: usize,
mut txs: HashMap<usize, UnboundedSender<Option<ArrowResult<RecordBatch>>>>,
partitioning: Partitioning,
- metrics: RepartitionMetrics,
+ r_metrics: RepartitionMetrics,
) -> Result<()> {
let num_output_partitions = txs.len();
// execute the child operator
- let now = Instant::now();
+ let timer = r_metrics.fetch_time.timer();
let mut stream = input.execute(i).await?;
- metrics.fetch_nanos.add_elapsed(now);
+ timer.done();
let mut counter = 0;
let hashes_buf = &mut vec![];
@@ -260,9 +272,9 @@ impl RepartitionExec {
// pulling inputs
while !txs.is_empty() {
// fetch the next batch
- let now = Instant::now();
+ let timer = r_metrics.fetch_time.timer();
let result = stream.next().await;
- metrics.fetch_nanos.add_elapsed(now);
+ timer.done();
// Input is done
if result.is_none() {
@@ -272,7 +284,7 @@ impl RepartitionExec {
match &partitioning {
Partitioning::RoundRobinBatch(_) => {
- let now = Instant::now();
+ let timer = r_metrics.send_time.timer();
let output_partition = counter % num_output_partitions;
// if there is still a receiver, send to it
if let Some(tx) = txs.get_mut(&output_partition) {
@@ -281,10 +293,10 @@ impl RepartitionExec {
txs.remove(&output_partition);
}
}
- metrics.send_nanos.add_elapsed(now);
+ timer.done();
}
Partitioning::Hash(exprs, _) => {
- let now = Instant::now();
+ let timer = r_metrics.repart_time.timer();
let input_batch = result?;
let arrays = exprs
.iter()
@@ -303,11 +315,12 @@ impl RepartitionExec {
indices[(*hash % num_output_partitions as u64) as usize]
.push(index as u64)
}
- metrics.repart_nanos.add_elapsed(now);
+ timer.done();
+
for (num_output_partition, partition_indices) in
indices.into_iter().enumerate()
{
- let now = Instant::now();
+ let timer = r_metrics.repart_time.timer();
let indices = partition_indices.into();
// Produce batches based on indices
let columns = input_batch
@@ -321,8 +334,9 @@ impl RepartitionExec {
.collect::<Result<Vec<Arc<dyn Array>>>>()?;
let output_batch =
RecordBatch::try_new(input_batch.schema(), columns);
- metrics.repart_nanos.add_elapsed(now);
- let now = Instant::now();
+ timer.done();
+
+ let timer = r_metrics.repart_time.timer();
// if there is still a receiver, send to it
if let Some(tx) = txs.get_mut(&num_output_partition) {
if tx.send(Some(output_batch)).is_err() {
@@ -330,7 +344,7 @@ impl RepartitionExec {
txs.remove(&num_output_partition);
}
}
- metrics.send_nanos.add_elapsed(now);
+ timer.done();
}
}
other => {
diff --git a/datafusion/src/physical_plan/sort.rs b/datafusion/src/physical_plan/sort.rs
index faaa10d..f1346b5 100644
--- a/datafusion/src/physical_plan/sort.rs
+++ b/datafusion/src/physical_plan/sort.rs
@@ -17,11 +17,12 @@
//! Defines the SORT plan
+use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
use super::{RecordBatchStream, SendableRecordBatchStream};
use crate::error::{DataFusionError, Result};
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::{
- common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, SQLMetric,
+ common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
};
pub use arrow::compute::SortOptions;
use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions};
@@ -32,13 +33,11 @@ use arrow::{array::ArrayRef, error::ArrowError};
use async_trait::async_trait;
use futures::stream::Stream;
use futures::Future;
-use hashbrown::HashMap;
use pin_project_lite::pin_project;
use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
-use std::time::Instant;
/// Sort execution plan
#[derive(Debug)]
@@ -47,10 +46,8 @@ pub struct SortExec {
input: Arc<dyn ExecutionPlan>,
/// Sort expressions
expr: Vec<PhysicalSortExpr>,
- /// Output rows
- output_rows: Arc<SQLMetric>,
- /// Time to sort batches
- sort_time_nanos: Arc<SQLMetric>,
+ /// Execution metrics
+ metrics: ExecutionPlanMetricsSet,
/// Preserve partitions of input plan
preserve_partitioning: bool,
}
@@ -74,9 +71,8 @@ impl SortExec {
Self {
expr,
input,
+ metrics: ExecutionPlanMetricsSet::new(),
preserve_partitioning,
- output_rows: SQLMetric::counter(),
- sort_time_nanos: SQLMetric::time_nanos(),
}
}
@@ -157,11 +153,15 @@ impl ExecutionPlan for SortExec {
let input = self.input.execute(partition).await?;
+ let output_rows = MetricBuilder::new(&self.metrics).output_rows(partition);
+
+ let cpu_time = MetricBuilder::new(&self.metrics).cpu_time(partition);
+
Ok(Box::pin(SortStream::new(
input,
self.expr.clone(),
- self.output_rows.clone(),
- self.sort_time_nanos.clone(),
+ output_rows,
+ cpu_time,
)))
}
@@ -178,11 +178,8 @@ impl ExecutionPlan for SortExec {
}
}
- fn metrics(&self) -> HashMap<String, SQLMetric> {
- let mut metrics = HashMap::new();
- metrics.insert("outputRows".to_owned(), (*self.output_rows).clone());
- metrics.insert("sortTime".to_owned(), (*self.sort_time_nanos).clone());
- metrics
+ fn metrics(&self) -> Option<MetricsSet> {
+ Some(self.metrics.clone_inner())
}
}
@@ -229,7 +226,7 @@ pin_project! {
output: futures::channel::oneshot::Receiver<ArrowResult<Option<RecordBatch>>>,
finished: bool,
schema: SchemaRef,
- output_rows: Arc<SQLMetric>,
+ output_rows: metrics::Count
}
}
@@ -237,8 +234,8 @@ impl SortStream {
fn new(
input: SendableRecordBatchStream,
expr: Vec<PhysicalSortExpr>,
- output_rows: Arc<SQLMetric>,
- sort_time: Arc<SQLMetric>,
+ output_rows: metrics::Count,
+ sort_time: metrics::Time,
) -> Self {
let (tx, rx) = futures::channel::oneshot::channel();
let schema = input.schema();
@@ -248,14 +245,14 @@ impl SortStream {
.await
.map_err(DataFusionError::into_arrow_external_error)
.and_then(move |batches| {
- let now = Instant::now();
+ let timer = sort_time.timer();
// combine all record batches into one for each column
let combined = common::combine_batches(&batches, schema.clone())?;
// sort combined record batch
let result = combined
.map(|batch| sort_batch(batch, schema, &expr))
.transpose()?;
- sort_time.add(now.elapsed().as_nanos() as usize);
+ timer.done();
Ok(result)
});
@@ -438,8 +435,9 @@ mod tests {
assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type());
let result: Vec<RecordBatch> = collect(sort_exec.clone()).await?;
- assert!(sort_exec.metrics().get("sortTime").unwrap().value() > 0);
- assert_eq!(sort_exec.metrics().get("outputRows").unwrap().value(), 8);
+ let metrics = sort_exec.metrics().unwrap();
+ assert!(metrics.cpu_time().unwrap() > 0);
+ assert_eq!(metrics.output_rows().unwrap(), 8);
assert_eq!(result.len(), 1);
let columns = result[0].columns();
diff --git a/datafusion/tests/parquet_pruning.rs b/datafusion/tests/parquet_pruning.rs
index 789f081..14f5dd2 100644
--- a/datafusion/tests/parquet_pruning.rs
+++ b/datafusion/tests/parquet_pruning.rs
@@ -33,11 +33,13 @@ use chrono::{Datelike, Duration};
use datafusion::{
datasource::TableProvider,
logical_plan::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder},
- physical_plan::{plan_metrics, SQLMetric},
+ physical_plan::{
+ accept, metrics::MetricsSet, parquet::ParquetExec, ExecutionPlan,
+ ExecutionPlanVisitor,
+ },
prelude::{ExecutionConfig, ExecutionContext},
scalar::ScalarValue,
};
-use hashbrown::HashMap;
use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
use tempfile::NamedTempFile;
@@ -417,6 +419,9 @@ enum Scenario {
/// table "t" registered, pointing at a parquet file made with
/// `make_test_file`
struct ContextWithParquet {
+ #[allow(dead_code)]
+ /// temp file parquet data is written to. The file is cleaned up
+ /// when dropped
file: NamedTempFile,
provider: Arc<dyn TableProvider>,
ctx: ExecutionContext,
@@ -426,8 +431,8 @@ struct ContextWithParquet {
struct TestOutput {
/// The input string
sql: String,
- /// Normalized metrics (filename replaced by a constant)
- metrics: HashMap<String, SQLMetric>,
+ /// Execution metrics for the Parquet Scan
+ parquet_metrics: MetricsSet,
/// number of rows in results
result_rows: usize,
/// the contents of the input, as a string
@@ -439,32 +444,25 @@ struct TestOutput {
impl TestOutput {
/// retrieve the value of the named metric, if any
fn metric_value(&self, metric_name: &str) -> Option<usize> {
- self.metrics.get(metric_name).map(|m| m.value())
+ self.parquet_metrics
+ .sum(|metric| metric.value().name() == metric_name)
+ .map(|v| v.as_usize())
}
/// The number of times the pruning predicate evaluation errors
fn predicate_evaluation_errors(&self) -> Option<usize> {
- self.metric_value("numPredicateEvaluationErrors for PARQUET_FILE")
+ self.metric_value("predicate_evaluation_errors")
}
/// The number of times the pruning predicate evaluation errors
fn row_groups_pruned(&self) -> Option<usize> {
- self.metric_value("numRowGroupsPruned for PARQUET_FILE")
+ self.metric_value("row_groups_pruned")
}
fn description(&self) -> String {
- let metrics = self
- .metrics
- .iter()
- .map(|(name, val)| format!(" {} = {:?}", name, val))
- .collect::<Vec<_>>();
-
format!(
"Input:\n{}\nQuery:\n{}\nOutput:\n{}\nMetrics:\n{}",
- self.pretty_input,
- self.sql,
- self.pretty_results,
- metrics.join("\n")
+ self.pretty_input, self.sql, self.pretty_results, self.parquet_metrics,
)
}
}
@@ -532,25 +530,35 @@ impl ContextWithParquet {
let pretty_input = pretty_format_batches(&input).unwrap();
let logical_plan = self.ctx.optimize(&logical_plan).expect("optimizing plan");
- let execution_plan = self
+ let physical_plan = self
.ctx
.create_physical_plan(&logical_plan)
.expect("creating physical plan");
- let results = datafusion::physical_plan::collect(execution_plan.clone())
+ let results = datafusion::physical_plan::collect(physical_plan.clone())
.await
.expect("Running");
- // replace the path name, which varies test to test,a with some
- // constant for test comparisons
- let path = self.file.path();
- let path_name = path.to_string_lossy();
- let metrics = plan_metrics(execution_plan)
- .into_iter()
- .map(|(name, metric)| {
- (name.replace(path_name.as_ref(), "PARQUET_FILE"), metric)
- })
- .collect();
+ // find the parquet metrics
+ struct MetricsFinder {
+ metrics: Option<MetricsSet>,
+ }
+ impl ExecutionPlanVisitor for MetricsFinder {
+ type Error = std::convert::Infallible;
+ fn pre_visit(
+ &mut self,
+ plan: &dyn ExecutionPlan,
+ ) -> Result<bool, Self::Error> {
+ if plan.as_any().downcast_ref::<ParquetExec>().is_some() {
+ self.metrics = plan.metrics();
+ }
+ // stop searching once we have found the metrics
+ Ok(self.metrics.is_none())
+ }
+ }
+ let mut finder = MetricsFinder { metrics: None };
+ accept(physical_plan.as_ref(), &mut finder).unwrap();
+ let parquet_metrics = finder.metrics.unwrap();
let result_rows = results.iter().map(|b| b.num_rows()).sum();
@@ -559,7 +567,7 @@ impl ContextWithParquet {
let sql = sql.into();
TestOutput {
sql,
- metrics,
+ parquet_metrics,
result_rows,
pretty_input,
pretty_results,
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 285a3a9..33788a9 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -2172,6 +2172,8 @@ async fn csv_explain_analyze() {
let formatted = arrow::util::pretty::pretty_format_batches(&actual).unwrap();
let formatted = normalize_for_explain(&formatted);
+ println!("ANALYZE EXPLAIN:\n{}", formatted);
+
// Only test basic plumbing and try to avoid having to change too
// many things
let needle = "RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), metrics=[";
@@ -2181,7 +2183,7 @@ async fn csv_explain_analyze() {
needle,
formatted
);
- let verbose_needle = "Output Rows | 5";
+ let verbose_needle = "Output Rows";
assert!(
!formatted.contains(verbose_needle),
"found unexpected '{}' in\n{}",
@@ -2201,7 +2203,7 @@ async fn csv_explain_analyze_verbose() {
let formatted = arrow::util::pretty::pretty_format_batches(&actual).unwrap();
let formatted = normalize_for_explain(&formatted);
- let verbose_needle = "Output Rows | 5";
+ let verbose_needle = "Output Rows";
assert!(
formatted.contains(verbose_needle),
"did not find '{}' in\n{}",