You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2021/09/15 11:31:26 UTC
[arrow-datafusion] branch master updated: Add metrics for Limit and
Projection, and CoalesceBatches (#1004)
This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 195b699 Add metrics for Limit and Projection, and CoalesceBatches (#1004)
195b699 is described below
commit 195b69995db8044ce283d72fb78eb6b74b8842f5
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Wed Sep 15 07:31:21 2021 -0400
Add metrics for Limit and Projection, and CoalesceBatches (#1004)
* Add metrics for Limit and Projection, and CoalesceBatches
* remove duplication
---
datafusion/src/physical_plan/coalesce_batches.rs | 38 +++++++++++---
datafusion/src/physical_plan/limit.rs | 63 ++++++++++++++++++++----
datafusion/src/physical_plan/projection.rs | 46 ++++++++++-------
datafusion/tests/sql.rs | 34 ++++++++++---
4 files changed, 142 insertions(+), 39 deletions(-)
diff --git a/datafusion/src/physical_plan/coalesce_batches.rs b/datafusion/src/physical_plan/coalesce_batches.rs
index aee9aea..7397493 100644
--- a/datafusion/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/src/physical_plan/coalesce_batches.rs
@@ -37,7 +37,8 @@ use async_trait::async_trait;
use futures::stream::{Stream, StreamExt};
use log::debug;
-use super::Statistics;
+use super::metrics::{BaselineMetrics, MetricsSet};
+use super::{metrics::ExecutionPlanMetricsSet, Statistics};
/// CoalesceBatchesExec combines small batches into larger batches for more efficient use of
/// vectorized processing by upstream operators.
@@ -47,6 +48,8 @@ pub struct CoalesceBatchesExec {
input: Arc<dyn ExecutionPlan>,
/// Minimum number of rows for coalesces batches
target_batch_size: usize,
+ /// Execution metrics
+ metrics: ExecutionPlanMetricsSet,
}
impl CoalesceBatchesExec {
@@ -55,6 +58,7 @@ impl CoalesceBatchesExec {
Self {
input,
target_batch_size,
+ metrics: ExecutionPlanMetricsSet::new(),
}
}
@@ -115,6 +119,7 @@ impl ExecutionPlan for CoalesceBatchesExec {
buffer: Vec::new(),
buffered_rows: 0,
is_closed: false,
+ baseline_metrics: BaselineMetrics::new(&self.metrics, partition),
}))
}
@@ -134,6 +139,10 @@ impl ExecutionPlan for CoalesceBatchesExec {
}
}
+ fn metrics(&self) -> Option<MetricsSet> {
+ Some(self.metrics.clone_inner())
+ }
+
fn statistics(&self) -> Statistics {
self.input.statistics()
}
@@ -152,6 +161,8 @@ struct CoalesceBatchesStream {
buffered_rows: usize,
/// Whether the stream has finished returning all of its data or not
is_closed: bool,
+ /// Execution metrics
+ baseline_metrics: BaselineMetrics,
}
impl Stream for CoalesceBatchesStream {
@@ -161,6 +172,26 @@ impl Stream for CoalesceBatchesStream {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
+ let poll = self.poll_next_inner(cx);
+ self.baseline_metrics.record_poll(poll)
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ // we can't predict the size of incoming batches so re-use the size hint from the input
+ self.input.size_hint()
+ }
+}
+
+impl CoalesceBatchesStream {
+ fn poll_next_inner(
+ self: &mut Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<ArrowResult<RecordBatch>>> {
+ // Get a clone (uses same underlying atomic) as self gets borrowed below
+ let cloned_time = self.baseline_metrics.elapsed_compute().clone();
+ // records time on drop
+ let _timer = cloned_time.timer();
+
if self.is_closed {
return Poll::Ready(None);
}
@@ -221,11 +252,6 @@ impl Stream for CoalesceBatchesStream {
}
}
}
-
- fn size_hint(&self) -> (usize, Option<usize>) {
- // we can't predict the size of incoming batches so re-use the size hint from the input
- self.input.size_hint()
- }
}
impl RecordBatchStream for CoalesceBatchesStream {
diff --git a/datafusion/src/physical_plan/limit.rs b/datafusion/src/physical_plan/limit.rs
index 792b8f5..ccd719f 100644
--- a/datafusion/src/physical_plan/limit.rs
+++ b/datafusion/src/physical_plan/limit.rs
@@ -35,7 +35,10 @@ use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
-use super::{RecordBatchStream, SendableRecordBatchStream, Statistics};
+use super::{
+ metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
+ RecordBatchStream, SendableRecordBatchStream, Statistics,
+};
use async_trait::async_trait;
@@ -46,12 +49,18 @@ pub struct GlobalLimitExec {
input: Arc<dyn ExecutionPlan>,
/// Maximum number of rows to return
limit: usize,
+ /// Execution metrics
+ metrics: ExecutionPlanMetricsSet,
}
impl GlobalLimitExec {
/// Create a new GlobalLimitExec
pub fn new(input: Arc<dyn ExecutionPlan>, limit: usize) -> Self {
- GlobalLimitExec { input, limit }
+ GlobalLimitExec {
+ input,
+ limit,
+ metrics: ExecutionPlanMetricsSet::new(),
+ }
}
/// Input execution plan
@@ -120,8 +129,13 @@ impl ExecutionPlan for GlobalLimitExec {
));
}
+ let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
let stream = self.input.execute(0).await?;
- Ok(Box::pin(LimitStream::new(stream, self.limit)))
+ Ok(Box::pin(LimitStream::new(
+ stream,
+ self.limit,
+ baseline_metrics,
+ )))
}
fn fmt_as(
@@ -136,6 +150,10 @@ impl ExecutionPlan for GlobalLimitExec {
}
}
+ fn metrics(&self) -> Option<MetricsSet> {
+ Some(self.metrics.clone_inner())
+ }
+
fn statistics(&self) -> Statistics {
let input_stats = self.input.statistics();
match input_stats {
@@ -165,12 +183,18 @@ pub struct LocalLimitExec {
input: Arc<dyn ExecutionPlan>,
/// Maximum number of rows to return
limit: usize,
+ /// Execution metrics
+ metrics: ExecutionPlanMetricsSet,
}
impl LocalLimitExec {
/// Create a new LocalLimitExec partition
pub fn new(input: Arc<dyn ExecutionPlan>, limit: usize) -> Self {
- Self { input, limit }
+ Self {
+ input,
+ limit,
+ metrics: ExecutionPlanMetricsSet::new(),
+ }
}
/// Input execution plan
@@ -219,8 +243,13 @@ impl ExecutionPlan for LocalLimitExec {
}
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
+ let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
let stream = self.input.execute(partition).await?;
- Ok(Box::pin(LimitStream::new(stream, self.limit)))
+ Ok(Box::pin(LimitStream::new(
+ stream,
+ self.limit,
+ baseline_metrics,
+ )))
}
fn fmt_as(
@@ -235,6 +264,10 @@ impl ExecutionPlan for LocalLimitExec {
}
}
+ fn metrics(&self) -> Option<MetricsSet> {
+ Some(self.metrics.clone_inner())
+ }
+
fn statistics(&self) -> Statistics {
let input_stats = self.input.statistics();
match input_stats {
@@ -280,20 +313,29 @@ struct LimitStream {
schema: SchemaRef,
// the current number of rows which have been produced
current_len: usize,
+ /// Execution time metrics
+ baseline_metrics: BaselineMetrics,
}
impl LimitStream {
- fn new(input: SendableRecordBatchStream, limit: usize) -> Self {
+ fn new(
+ input: SendableRecordBatchStream,
+ limit: usize,
+ baseline_metrics: BaselineMetrics,
+ ) -> Self {
let schema = input.schema();
Self {
limit,
input: Some(input),
schema,
current_len: 0,
+ baseline_metrics,
}
}
fn stream_limit(&mut self, batch: RecordBatch) -> Option<RecordBatch> {
+ // records time on drop
+ let _timer = self.baseline_metrics.elapsed_compute().timer();
if self.current_len == self.limit {
self.input = None; // clear input so it can be dropped early
None
@@ -316,14 +358,16 @@ impl Stream for LimitStream {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- match &mut self.input {
+ let poll = match &mut self.input {
Some(input) => input.poll_next_unpin(cx).map(|x| match x {
Some(Ok(batch)) => Ok(self.stream_limit(batch)).transpose(),
other => other,
}),
// input has been cleared
None => Poll::Ready(None),
- }
+ };
+
+ self.baseline_metrics.record_poll(poll)
}
}
@@ -394,7 +438,8 @@ mod tests {
// limit of six needs to consume the entire first record batch
// (5 rows) and 1 row from the second (1 row)
- let limit_stream = LimitStream::new(Box::pin(input), 6);
+ let baseline_metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
+ let limit_stream = LimitStream::new(Box::pin(input), 6, baseline_metrics);
assert_eq!(index.value(), 0);
let results = collect(Box::pin(limit_stream)).await.unwrap();
diff --git a/datafusion/src/physical_plan/projection.rs b/datafusion/src/physical_plan/projection.rs
index 97ff83e..f247261 100644
--- a/datafusion/src/physical_plan/projection.rs
+++ b/datafusion/src/physical_plan/projection.rs
@@ -34,6 +34,7 @@ use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use super::expressions::Column;
+use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use super::{RecordBatchStream, SendableRecordBatchStream, Statistics};
use async_trait::async_trait;
@@ -49,6 +50,8 @@ pub struct ProjectionExec {
schema: SchemaRef,
/// The input plan
input: Arc<dyn ExecutionPlan>,
+ /// Execution metrics
+ metrics: ExecutionPlanMetricsSet,
}
impl ProjectionExec {
@@ -76,6 +79,7 @@ impl ProjectionExec {
expr,
schema,
input: input.clone(),
+ metrics: ExecutionPlanMetricsSet::new(),
})
}
@@ -131,6 +135,7 @@ impl ExecutionPlan for ProjectionExec {
schema: self.schema.clone(),
expr: self.expr.iter().map(|x| x.0.clone()).collect(),
input: self.input.execute(partition).await?,
+ baseline_metrics: BaselineMetrics::new(&self.metrics, partition),
}))
}
@@ -159,6 +164,10 @@ impl ExecutionPlan for ProjectionExec {
}
}
+ fn metrics(&self) -> Option<MetricsSet> {
+ Some(self.metrics.clone_inner())
+ }
+
fn statistics(&self) -> Statistics {
stats_projection(
self.input.statistics(),
@@ -194,20 +203,20 @@ fn stats_projection(
}
}
-fn batch_project(
- batch: &RecordBatch,
- expressions: &[Arc<dyn PhysicalExpr>],
- schema: &SchemaRef,
-) -> ArrowResult<RecordBatch> {
- expressions
- .iter()
- .map(|expr| expr.evaluate(batch))
- .map(|r| r.map(|v| v.into_array(batch.num_rows())))
- .collect::<Result<Vec<_>>>()
- .map_or_else(
- |e| Err(DataFusionError::into_arrow_external_error(e)),
- |arrays| RecordBatch::try_new(schema.clone(), arrays),
- )
+impl ProjectionStream {
+ fn batch_project(&self, batch: &RecordBatch) -> ArrowResult<RecordBatch> {
+ // records time on drop
+ let _timer = self.baseline_metrics.elapsed_compute().timer();
+ self.expr
+ .iter()
+ .map(|expr| expr.evaluate(batch))
+ .map(|r| r.map(|v| v.into_array(batch.num_rows())))
+ .collect::<Result<Vec<_>>>()
+ .map_or_else(
+ |e| Err(DataFusionError::into_arrow_external_error(e)),
+ |arrays| RecordBatch::try_new(self.schema.clone(), arrays),
+ )
+ }
}
/// Projection iterator
@@ -215,6 +224,7 @@ struct ProjectionStream {
schema: SchemaRef,
expr: Vec<Arc<dyn PhysicalExpr>>,
input: SendableRecordBatchStream,
+ baseline_metrics: BaselineMetrics,
}
impl Stream for ProjectionStream {
@@ -224,10 +234,12 @@ impl Stream for ProjectionStream {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- self.input.poll_next_unpin(cx).map(|x| match x {
- Some(Ok(batch)) => Some(batch_project(&batch, &self.expr, &self.schema)),
+ let poll = self.input.poll_next_unpin(cx).map(|x| match x {
+ Some(Ok(batch)) => Some(self.batch_project(&batch)),
other => other,
- })
+ });
+
+ self.baseline_metrics.record_poll(poll)
}
fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index eaf988f..90173be 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -2361,13 +2361,15 @@ async fn explain_analyze_baseline_metrics() {
FROM aggregate_test_100 \
WHERE c13 != 'C2GT5KVyOPZpgKVl110TyZO0NcJ434' \
GROUP BY c1 \
- ORDER BY c1)";
+ ORDER BY c1) \
+ LIMIT 1";
println!("running query: {}", sql);
let plan = ctx.create_logical_plan(sql).unwrap();
let plan = ctx.optimize(&plan).unwrap();
let physical_plan = ctx.create_physical_plan(&plan).unwrap();
let results = collect(physical_plan.clone()).await.unwrap();
let formatted = arrow::util::pretty::pretty_format_batches(&results).unwrap();
+ println!("Query Output:\n\n{}", formatted);
let formatted = normalize_for_explain(&formatted);
assert_metrics!(
@@ -2395,14 +2397,32 @@ async fn explain_analyze_baseline_metrics() {
"FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434",
"metrics=[output_rows=99, elapsed_compute="
);
+ assert_metrics!(
+ &formatted,
+ "GlobalLimitExec: limit=1, ",
+ "metrics=[output_rows=1, elapsed_compute="
+ );
+ assert_metrics!(
+ &formatted,
+ "ProjectionExec: expr=[COUNT(UInt8(1))",
+ "metrics=[output_rows=1, elapsed_compute="
+ );
+ assert_metrics!(
+ &formatted,
+ "CoalesceBatchesExec: target_batch_size=4096",
+ "metrics=[output_rows=5, elapsed_compute"
+ );
fn expected_to_have_metrics(plan: &dyn ExecutionPlan) -> bool {
- use datafusion::physical_plan::{
- hash_aggregate::HashAggregateExec, sort::SortExec,
- };
-
- plan.as_any().downcast_ref::<SortExec>().is_some()
- || plan.as_any().downcast_ref::<HashAggregateExec>().is_some()
+ use datafusion::physical_plan;
+
+ plan.as_any().downcast_ref::<physical_plan::sort::SortExec>().is_some()
+ || plan.as_any().downcast_ref::<physical_plan::hash_aggregate::HashAggregateExec>().is_some()
+ // CoalescePartitionsExec doesn't do any work so is not included
+ || plan.as_any().downcast_ref::<physical_plan::filter::FilterExec>().is_some()
+ || plan.as_any().downcast_ref::<physical_plan::projection::ProjectionExec>().is_some()
+ || plan.as_any().downcast_ref::<physical_plan::coalesce_batches::CoalesceBatchesExec>().is_some()
+ || plan.as_any().downcast_ref::<physical_plan::limit::GlobalLimitExec>().is_some()
}
// Validate that the recorded elapsed compute time was more than