You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/08/31 11:07:19 UTC
[arrow-datafusion] branch master updated: Add metrics for SortExect
+ HashAggregateExec (#938)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new bef41bc Add metrics for SortExect + HashAggregateExec (#938)
bef41bc is described below
commit bef41bcf7de2b9c4d1c4570e8349820d7f54c0b7
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Tue Aug 31 07:07:07 2021 -0400
Add metrics for SortExect + HashAggregateExec (#938)
* Add metrics to HashAggregateExec
* Modernize metrics for SortExec
* Consistent output regardless of cores
* use target_partitions rather than concurrency
---
datafusion/src/physical_plan/hash_aggregate.rs | 60 +++++++++----
datafusion/src/physical_plan/sort.rs | 29 ++-----
datafusion/tests/sql.rs | 111 ++++++++++++++++++++++++-
3 files changed, 160 insertions(+), 40 deletions(-)
diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs
index 5a2ec9e..e21cc31 100644
--- a/datafusion/src/physical_plan/hash_aggregate.rs
+++ b/datafusion/src/physical_plan/hash_aggregate.rs
@@ -50,7 +50,9 @@ use pin_project_lite::pin_project;
use async_trait::async_trait;
-use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
+use super::metrics::{
+ self, BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput,
+};
use super::{expressions::Column, RecordBatchStream, SendableRecordBatchStream};
/// Hash aggregate modes
@@ -207,7 +209,7 @@ impl ExecutionPlan for HashAggregateExec {
let input = self.input.execute(partition).await?;
let group_expr = self.group_expr.iter().map(|x| x.0.clone()).collect();
- let output_rows = MetricBuilder::new(&self.metrics).output_rows(partition);
+ let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
if self.group_expr.is_empty() {
Ok(Box::pin(HashAggregateStream::new(
@@ -215,6 +217,7 @@ impl ExecutionPlan for HashAggregateExec {
self.schema.clone(),
self.aggr_expr.clone(),
input,
+ baseline_metrics,
)))
} else {
Ok(Box::pin(GroupedHashAggregateStream::new(
@@ -223,7 +226,7 @@ impl ExecutionPlan for HashAggregateExec {
group_expr,
self.aggr_expr.clone(),
input,
- output_rows,
+ baseline_metrics,
)))
}
}
@@ -315,7 +318,6 @@ pin_project! {
#[pin]
output: futures::channel::oneshot::Receiver<ArrowResult<RecordBatch>>,
finished: bool,
- output_rows: metrics::Count,
}
}
@@ -487,7 +489,9 @@ async fn compute_grouped_hash_aggregate(
group_expr: Vec<Arc<dyn PhysicalExpr>>,
aggr_expr: Vec<Arc<dyn AggregateExpr>>,
mut input: SendableRecordBatchStream,
+ elapsed_compute: metrics::Time,
) -> ArrowResult<RecordBatch> {
+ let timer = elapsed_compute.timer();
// The expressions to evaluate the batch, one vec of expressions per aggregation.
// Assume create_schema() always put group columns in front of aggr columns, we set
// col_idx_base to group expression count.
@@ -499,8 +503,10 @@ async fn compute_grouped_hash_aggregate(
// iterate over all input batches and update the accumulators
let mut accumulators = Accumulators::default();
+ timer.done();
while let Some(batch) = input.next().await {
let batch = batch?;
+ let timer = elapsed_compute.timer();
accumulators = group_aggregate_batch(
&mode,
&random_state,
@@ -511,9 +517,13 @@ async fn compute_grouped_hash_aggregate(
&aggregate_expressions,
)
.map_err(DataFusionError::into_arrow_external_error)?;
+ timer.done();
}
- create_batch_from_map(&mode, &accumulators, group_expr.len(), &schema)
+ let timer = elapsed_compute.timer();
+ let batch = create_batch_from_map(&mode, &accumulators, group_expr.len(), &schema);
+ timer.done();
+ batch
}
impl GroupedHashAggregateStream {
@@ -524,11 +534,12 @@ impl GroupedHashAggregateStream {
group_expr: Vec<Arc<dyn PhysicalExpr>>,
aggr_expr: Vec<Arc<dyn AggregateExpr>>,
input: SendableRecordBatchStream,
- output_rows: metrics::Count,
+ baseline_metrics: BaselineMetrics,
) -> Self {
let (tx, rx) = futures::channel::oneshot::channel();
let schema_clone = schema.clone();
+ let elapsed_compute = baseline_metrics.elapsed_compute().clone();
tokio::spawn(async move {
let result = compute_grouped_hash_aggregate(
mode,
@@ -536,8 +547,10 @@ impl GroupedHashAggregateStream {
group_expr,
aggr_expr,
input,
+ elapsed_compute,
)
- .await;
+ .await
+ .record_output(&baseline_metrics);
tx.send(result)
});
@@ -545,7 +558,6 @@ impl GroupedHashAggregateStream {
schema,
output: rx,
finished: false,
- output_rows,
}
}
}
@@ -604,8 +616,6 @@ impl Stream for GroupedHashAggregateStream {
return Poll::Ready(None);
}
- let output_rows = self.output_rows.clone();
-
// is the output ready?
let this = self.project();
let output_poll = this.output.poll(cx);
@@ -620,10 +630,6 @@ impl Stream for GroupedHashAggregateStream {
Ok(result) => result,
};
- if let Ok(batch) = &result {
- output_rows.add(batch.num_rows())
- }
-
Poll::Ready(Some(result))
}
Poll::Pending => Poll::Pending,
@@ -720,25 +726,33 @@ async fn compute_hash_aggregate(
schema: SchemaRef,
aggr_expr: Vec<Arc<dyn AggregateExpr>>,
mut input: SendableRecordBatchStream,
+ elapsed_compute: metrics::Time,
) -> ArrowResult<RecordBatch> {
+ let timer = elapsed_compute.timer();
let mut accumulators = create_accumulators(&aggr_expr)
.map_err(DataFusionError::into_arrow_external_error)?;
let expressions = aggregate_expressions(&aggr_expr, &mode, 0)
.map_err(DataFusionError::into_arrow_external_error)?;
let expressions = Arc::new(expressions);
+ timer.done();
// 1 for each batch, update / merge accumulators with the expressions' values
// future is ready when all batches are computed
while let Some(batch) = input.next().await {
let batch = batch?;
+ let timer = elapsed_compute.timer();
aggregate_batch(&mode, &batch, &mut accumulators, &expressions)
.map_err(DataFusionError::into_arrow_external_error)?;
+ timer.done();
}
// 2. convert values to a record batch
- finalize_aggregation(&accumulators, &mode)
+ let timer = elapsed_compute.timer();
+ let batch = finalize_aggregation(&accumulators, &mode)
.map(|columns| RecordBatch::try_new(schema.clone(), columns))
- .map_err(DataFusionError::into_arrow_external_error)?
+ .map_err(DataFusionError::into_arrow_external_error)?;
+ timer.done();
+ batch
}
impl HashAggregateStream {
@@ -748,13 +762,23 @@ impl HashAggregateStream {
schema: SchemaRef,
aggr_expr: Vec<Arc<dyn AggregateExpr>>,
input: SendableRecordBatchStream,
+ baseline_metrics: BaselineMetrics,
) -> Self {
let (tx, rx) = futures::channel::oneshot::channel();
let schema_clone = schema.clone();
+ let elapsed_compute = baseline_metrics.elapsed_compute().clone();
tokio::spawn(async move {
- let result =
- compute_hash_aggregate(mode, schema_clone, aggr_expr, input).await;
+ let result = compute_hash_aggregate(
+ mode,
+ schema_clone,
+ aggr_expr,
+ input,
+ elapsed_compute,
+ )
+ .await
+ .record_output(&baseline_metrics);
+
tx.send(result)
});
diff --git a/datafusion/src/physical_plan/sort.rs b/datafusion/src/physical_plan/sort.rs
index df77a16..5a47931 100644
--- a/datafusion/src/physical_plan/sort.rs
+++ b/datafusion/src/physical_plan/sort.rs
@@ -17,7 +17,9 @@
//! Defines the SORT plan
-use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
+use super::metrics::{
+ BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput,
+};
use super::{RecordBatchStream, SendableRecordBatchStream};
use crate::error::{DataFusionError, Result};
use crate::physical_plan::expressions::PhysicalSortExpr;
@@ -151,18 +153,13 @@ impl ExecutionPlan for SortExec {
}
}
+ let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
let input = self.input.execute(partition).await?;
- let output_rows = MetricBuilder::new(&self.metrics).output_rows(partition);
-
- let elapsed_compute =
- MetricBuilder::new(&self.metrics).elapsed_compute(partition);
-
Ok(Box::pin(SortStream::new(
input,
self.expr.clone(),
- output_rows,
- elapsed_compute,
+ baseline_metrics,
)))
}
@@ -227,7 +224,6 @@ pin_project! {
output: futures::channel::oneshot::Receiver<ArrowResult<Option<RecordBatch>>>,
finished: bool,
schema: SchemaRef,
- output_rows: metrics::Count
}
}
@@ -235,8 +231,7 @@ impl SortStream {
fn new(
input: SendableRecordBatchStream,
expr: Vec<PhysicalSortExpr>,
- output_rows: metrics::Count,
- sort_time: metrics::Time,
+ baseline_metrics: BaselineMetrics,
) -> Self {
let (tx, rx) = futures::channel::oneshot::channel();
let schema = input.schema();
@@ -246,13 +241,14 @@ impl SortStream {
.await
.map_err(DataFusionError::into_arrow_external_error)
.and_then(move |batches| {
- let timer = sort_time.timer();
+ let timer = baseline_metrics.elapsed_compute().timer();
// combine all record batches into one for each column
let combined = common::combine_batches(&batches, schema.clone())?;
// sort combined record batch
let result = combined
.map(|batch| sort_batch(batch, schema, &expr))
- .transpose()?;
+ .transpose()?
+ .record_output(&baseline_metrics);
timer.done();
Ok(result)
});
@@ -264,7 +260,6 @@ impl SortStream {
output: rx,
finished: false,
schema,
- output_rows,
}
}
}
@@ -273,8 +268,6 @@ impl Stream for SortStream {
type Item = ArrowResult<RecordBatch>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- let output_rows = self.output_rows.clone();
-
if self.finished {
return Poll::Ready(None);
}
@@ -293,10 +286,6 @@ impl Stream for SortStream {
Ok(result) => result.transpose(),
};
- if let Some(Ok(batch)) = &result {
- output_rows.add(batch.num_rows());
- }
-
Poll::Ready(result)
}
Poll::Pending => Poll::Pending,
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 3982790..03c5cbb 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -41,6 +41,9 @@ use arrow::{
use datafusion::assert_batches_eq;
use datafusion::assert_batches_sorted_eq;
use datafusion::logical_plan::LogicalPlan;
+use datafusion::physical_plan::metrics::MetricValue;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::physical_plan::ExecutionPlanVisitor;
use datafusion::prelude::*;
use datafusion::{
datasource::{csv::CsvReadOptions, MemTable},
@@ -2194,8 +2197,6 @@ async fn csv_explain_analyze() {
let formatted = arrow::util::pretty::pretty_format_batches(&actual).unwrap();
let formatted = normalize_for_explain(&formatted);
- println!("ANALYZE EXPLAIN:\n{}", formatted);
-
// Only test basic plumbing and try to avoid having to change too
// many things
let needle =
@@ -2221,6 +2222,112 @@ async fn csv_explain_analyze_verbose() {
assert_contains!(formatted, verbose_needle);
}
+/// A macro to assert that some particular line contains two substrings
+///
+/// Usage: `assert_metrics!(actual, operator_name, metrics)`
+///
+macro_rules! assert_metrics {
+ ($ACTUAL: expr, $OPERATOR_NAME: expr, $METRICS: expr) => {
+ let found = $ACTUAL
+ .lines()
+ .any(|line| line.contains($OPERATOR_NAME) && line.contains($METRICS));
+ assert!(
+ found,
+ "Can not find a line with both '{}' and '{}' in\n\n{}",
+ $OPERATOR_NAME, $METRICS, $ACTUAL
+ );
+ };
+}
+
+#[tokio::test]
+async fn explain_analyze_baseline_metrics() {
+ // This test uses the execute function to run an actual plan under EXPLAIN ANALYZE
+ // and then validate the presence of baseline metrics for supported operators
+ let config = ExecutionConfig::new().with_target_partitions(3);
+ let mut ctx = ExecutionContext::with_config(config);
+ register_aggregate_csv_by_sql(&mut ctx).await;
+ // a query with as many operators as we have metrics for
+ let sql = "EXPLAIN ANALYZE select count(*) from (SELECT count(*), c1 FROM aggregate_test_100 group by c1 ORDER BY c1)";
+ let plan = ctx.create_logical_plan(sql).unwrap();
+ let plan = ctx.optimize(&plan).unwrap();
+ let physical_plan = ctx.create_physical_plan(&plan).unwrap();
+ let results = collect(physical_plan.clone()).await.unwrap();
+ let formatted = arrow::util::pretty::pretty_format_batches(&results).unwrap();
+ let formatted = normalize_for_explain(&formatted);
+
+ assert_metrics!(
+ &formatted,
+ "CoalescePartitionsExec",
+ "metrics=[output_rows=5, elapsed_compute=NOT RECORDED"
+ );
+ assert_metrics!(
+ &formatted,
+ "HashAggregateExec: mode=Partial, gby=[]",
+ "metrics=[output_rows=3, elapsed_compute="
+ );
+ assert_metrics!(
+ &formatted,
+ "HashAggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1]",
+ "metrics=[output_rows=5, elapsed_compute="
+ );
+ assert_metrics!(
+ &formatted,
+ "SortExec: [c1@0 ASC]",
+ "metrics=[output_rows=5, elapsed_compute="
+ );
+
+ fn expected_to_have_metrics(plan: &dyn ExecutionPlan) -> bool {
+ use datafusion::physical_plan::{
+ hash_aggregate::HashAggregateExec, sort::SortExec,
+ };
+
+ plan.as_any().downcast_ref::<SortExec>().is_some()
+ || plan.as_any().downcast_ref::<HashAggregateExec>().is_some()
+ }
+
+ // Validate that the recorded elapsed compute time was more than
+ // zero for all operators as well as the start/end timestamp are set
+ struct TimeValidator {}
+ impl ExecutionPlanVisitor for TimeValidator {
+ type Error = std::convert::Infallible;
+
+ fn pre_visit(
+ &mut self,
+ plan: &dyn ExecutionPlan,
+ ) -> std::result::Result<bool, Self::Error> {
+ if !expected_to_have_metrics(plan) {
+ return Ok(true);
+ }
+ let metrics = plan.metrics().unwrap().aggregate_by_partition();
+
+ assert!(metrics.output_rows().unwrap() > 0);
+ assert!(metrics.elapsed_compute().unwrap() > 0);
+
+ let mut saw_start = false;
+ let mut saw_end = false;
+ metrics.iter().for_each(|m| match m.value() {
+ MetricValue::StartTimestamp(ts) => {
+ saw_start = true;
+ assert!(ts.value().unwrap().timestamp_nanos() > 0);
+ }
+ MetricValue::EndTimestamp(ts) => {
+ saw_end = true;
+ assert!(ts.value().unwrap().timestamp_nanos() > 0);
+ }
+ _ => {}
+ });
+
+ assert!(saw_start);
+ assert!(saw_end);
+
+ Ok(true)
+ }
+ }
+
+ datafusion::physical_plan::accept(physical_plan.as_ref(), &mut TimeValidator {})
+ .unwrap();
+}
+
#[tokio::test]
async fn csv_explain_plans() {
// This test verify the look of each plan in its full cycle plan creation