You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2021/09/15 11:31:26 UTC

[arrow-datafusion] branch master updated: Add metrics for Limit and Projection, and CoalesceBatches (#1004)

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 195b699  Add metrics for Limit and Projection, and CoalesceBatches (#1004)
195b699 is described below

commit 195b69995db8044ce283d72fb78eb6b74b8842f5
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Wed Sep 15 07:31:21 2021 -0400

    Add metrics for Limit and Projection, and CoalesceBatches (#1004)
    
    * Add metrics for Limit and Projection, and CoalesceBatches
    
    * remove duplication
---
 datafusion/src/physical_plan/coalesce_batches.rs | 38 +++++++++++---
 datafusion/src/physical_plan/limit.rs            | 63 ++++++++++++++++++++----
 datafusion/src/physical_plan/projection.rs       | 46 ++++++++++-------
 datafusion/tests/sql.rs                          | 34 ++++++++++---
 4 files changed, 142 insertions(+), 39 deletions(-)

diff --git a/datafusion/src/physical_plan/coalesce_batches.rs b/datafusion/src/physical_plan/coalesce_batches.rs
index aee9aea..7397493 100644
--- a/datafusion/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/src/physical_plan/coalesce_batches.rs
@@ -37,7 +37,8 @@ use async_trait::async_trait;
 use futures::stream::{Stream, StreamExt};
 use log::debug;
 
-use super::Statistics;
+use super::metrics::{BaselineMetrics, MetricsSet};
+use super::{metrics::ExecutionPlanMetricsSet, Statistics};
 
 /// CoalesceBatchesExec combines small batches into larger batches for more efficient use of
 /// vectorized processing by upstream operators.
@@ -47,6 +48,8 @@ pub struct CoalesceBatchesExec {
     input: Arc<dyn ExecutionPlan>,
     /// Minimum number of rows for coalesces batches
     target_batch_size: usize,
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
 }
 
 impl CoalesceBatchesExec {
@@ -55,6 +58,7 @@ impl CoalesceBatchesExec {
         Self {
             input,
             target_batch_size,
+            metrics: ExecutionPlanMetricsSet::new(),
         }
     }
 
@@ -115,6 +119,7 @@ impl ExecutionPlan for CoalesceBatchesExec {
             buffer: Vec::new(),
             buffered_rows: 0,
             is_closed: false,
+            baseline_metrics: BaselineMetrics::new(&self.metrics, partition),
         }))
     }
 
@@ -134,6 +139,10 @@ impl ExecutionPlan for CoalesceBatchesExec {
         }
     }
 
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
+    }
+
     fn statistics(&self) -> Statistics {
         self.input.statistics()
     }
@@ -152,6 +161,8 @@ struct CoalesceBatchesStream {
     buffered_rows: usize,
     /// Whether the stream has finished returning all of its data or not
     is_closed: bool,
+    /// Execution metrics
+    baseline_metrics: BaselineMetrics,
 }
 
 impl Stream for CoalesceBatchesStream {
@@ -161,6 +172,26 @@ impl Stream for CoalesceBatchesStream {
         mut self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
+        let poll = self.poll_next_inner(cx);
+        self.baseline_metrics.record_poll(poll)
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        // we can't predict the size of incoming batches so re-use the size hint from the input
+        self.input.size_hint()
+    }
+}
+
+impl CoalesceBatchesStream {
+    fn poll_next_inner(
+        self: &mut Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<ArrowResult<RecordBatch>>> {
+        // Get a clone (uses same underlying atomic) as self gets borrowed below
+        let cloned_time = self.baseline_metrics.elapsed_compute().clone();
+        // records time on drop
+        let _timer = cloned_time.timer();
+
         if self.is_closed {
             return Poll::Ready(None);
         }
@@ -221,11 +252,6 @@ impl Stream for CoalesceBatchesStream {
             }
         }
     }
-
-    fn size_hint(&self) -> (usize, Option<usize>) {
-        // we can't predict the size of incoming batches so re-use the size hint from the input
-        self.input.size_hint()
-    }
 }
 
 impl RecordBatchStream for CoalesceBatchesStream {
diff --git a/datafusion/src/physical_plan/limit.rs b/datafusion/src/physical_plan/limit.rs
index 792b8f5..ccd719f 100644
--- a/datafusion/src/physical_plan/limit.rs
+++ b/datafusion/src/physical_plan/limit.rs
@@ -35,7 +35,10 @@ use arrow::datatypes::SchemaRef;
 use arrow::error::Result as ArrowResult;
 use arrow::record_batch::RecordBatch;
 
-use super::{RecordBatchStream, SendableRecordBatchStream, Statistics};
+use super::{
+    metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
+    RecordBatchStream, SendableRecordBatchStream, Statistics,
+};
 
 use async_trait::async_trait;
 
@@ -46,12 +49,18 @@ pub struct GlobalLimitExec {
     input: Arc<dyn ExecutionPlan>,
     /// Maximum number of rows to return
     limit: usize,
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
 }
 
 impl GlobalLimitExec {
     /// Create a new GlobalLimitExec
     pub fn new(input: Arc<dyn ExecutionPlan>, limit: usize) -> Self {
-        GlobalLimitExec { input, limit }
+        GlobalLimitExec {
+            input,
+            limit,
+            metrics: ExecutionPlanMetricsSet::new(),
+        }
     }
 
     /// Input execution plan
@@ -120,8 +129,13 @@ impl ExecutionPlan for GlobalLimitExec {
             ));
         }
 
+        let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
         let stream = self.input.execute(0).await?;
-        Ok(Box::pin(LimitStream::new(stream, self.limit)))
+        Ok(Box::pin(LimitStream::new(
+            stream,
+            self.limit,
+            baseline_metrics,
+        )))
     }
 
     fn fmt_as(
@@ -136,6 +150,10 @@ impl ExecutionPlan for GlobalLimitExec {
         }
     }
 
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
+    }
+
     fn statistics(&self) -> Statistics {
         let input_stats = self.input.statistics();
         match input_stats {
@@ -165,12 +183,18 @@ pub struct LocalLimitExec {
     input: Arc<dyn ExecutionPlan>,
     /// Maximum number of rows to return
     limit: usize,
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
 }
 
 impl LocalLimitExec {
     /// Create a new LocalLimitExec partition
     pub fn new(input: Arc<dyn ExecutionPlan>, limit: usize) -> Self {
-        Self { input, limit }
+        Self {
+            input,
+            limit,
+            metrics: ExecutionPlanMetricsSet::new(),
+        }
     }
 
     /// Input execution plan
@@ -219,8 +243,13 @@ impl ExecutionPlan for LocalLimitExec {
     }
 
     async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
+        let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
         let stream = self.input.execute(partition).await?;
-        Ok(Box::pin(LimitStream::new(stream, self.limit)))
+        Ok(Box::pin(LimitStream::new(
+            stream,
+            self.limit,
+            baseline_metrics,
+        )))
     }
 
     fn fmt_as(
@@ -235,6 +264,10 @@ impl ExecutionPlan for LocalLimitExec {
         }
     }
 
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
+    }
+
     fn statistics(&self) -> Statistics {
         let input_stats = self.input.statistics();
         match input_stats {
@@ -280,20 +313,29 @@ struct LimitStream {
     schema: SchemaRef,
     // the current number of rows which have been produced
     current_len: usize,
+    /// Execution time metrics
+    baseline_metrics: BaselineMetrics,
 }
 
 impl LimitStream {
-    fn new(input: SendableRecordBatchStream, limit: usize) -> Self {
+    fn new(
+        input: SendableRecordBatchStream,
+        limit: usize,
+        baseline_metrics: BaselineMetrics,
+    ) -> Self {
         let schema = input.schema();
         Self {
             limit,
             input: Some(input),
             schema,
             current_len: 0,
+            baseline_metrics,
         }
     }
 
     fn stream_limit(&mut self, batch: RecordBatch) -> Option<RecordBatch> {
+        // records time on drop
+        let _timer = self.baseline_metrics.elapsed_compute().timer();
         if self.current_len == self.limit {
             self.input = None; // clear input so it can be dropped early
             None
@@ -316,14 +358,16 @@ impl Stream for LimitStream {
         mut self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
-        match &mut self.input {
+        let poll = match &mut self.input {
             Some(input) => input.poll_next_unpin(cx).map(|x| match x {
                 Some(Ok(batch)) => Ok(self.stream_limit(batch)).transpose(),
                 other => other,
             }),
             // input has been cleared
             None => Poll::Ready(None),
-        }
+        };
+
+        self.baseline_metrics.record_poll(poll)
     }
 }
 
@@ -394,7 +438,8 @@ mod tests {
 
         // limit of six needs to consume the entire first record batch
         // (5 rows) and 1 row from the second (1 row)
-        let limit_stream = LimitStream::new(Box::pin(input), 6);
+        let baseline_metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
+        let limit_stream = LimitStream::new(Box::pin(input), 6, baseline_metrics);
         assert_eq!(index.value(), 0);
 
         let results = collect(Box::pin(limit_stream)).await.unwrap();
diff --git a/datafusion/src/physical_plan/projection.rs b/datafusion/src/physical_plan/projection.rs
index 97ff83e..f247261 100644
--- a/datafusion/src/physical_plan/projection.rs
+++ b/datafusion/src/physical_plan/projection.rs
@@ -34,6 +34,7 @@ use arrow::error::Result as ArrowResult;
 use arrow::record_batch::RecordBatch;
 
 use super::expressions::Column;
+use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
 use super::{RecordBatchStream, SendableRecordBatchStream, Statistics};
 use async_trait::async_trait;
 
@@ -49,6 +50,8 @@ pub struct ProjectionExec {
     schema: SchemaRef,
     /// The input plan
     input: Arc<dyn ExecutionPlan>,
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
 }
 
 impl ProjectionExec {
@@ -76,6 +79,7 @@ impl ProjectionExec {
             expr,
             schema,
             input: input.clone(),
+            metrics: ExecutionPlanMetricsSet::new(),
         })
     }
 
@@ -131,6 +135,7 @@ impl ExecutionPlan for ProjectionExec {
             schema: self.schema.clone(),
             expr: self.expr.iter().map(|x| x.0.clone()).collect(),
             input: self.input.execute(partition).await?,
+            baseline_metrics: BaselineMetrics::new(&self.metrics, partition),
         }))
     }
 
@@ -159,6 +164,10 @@ impl ExecutionPlan for ProjectionExec {
         }
     }
 
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
+    }
+
     fn statistics(&self) -> Statistics {
         stats_projection(
             self.input.statistics(),
@@ -194,20 +203,20 @@ fn stats_projection(
     }
 }
 
-fn batch_project(
-    batch: &RecordBatch,
-    expressions: &[Arc<dyn PhysicalExpr>],
-    schema: &SchemaRef,
-) -> ArrowResult<RecordBatch> {
-    expressions
-        .iter()
-        .map(|expr| expr.evaluate(batch))
-        .map(|r| r.map(|v| v.into_array(batch.num_rows())))
-        .collect::<Result<Vec<_>>>()
-        .map_or_else(
-            |e| Err(DataFusionError::into_arrow_external_error(e)),
-            |arrays| RecordBatch::try_new(schema.clone(), arrays),
-        )
+impl ProjectionStream {
+    fn batch_project(&self, batch: &RecordBatch) -> ArrowResult<RecordBatch> {
+        // records time on drop
+        let _timer = self.baseline_metrics.elapsed_compute().timer();
+        self.expr
+            .iter()
+            .map(|expr| expr.evaluate(batch))
+            .map(|r| r.map(|v| v.into_array(batch.num_rows())))
+            .collect::<Result<Vec<_>>>()
+            .map_or_else(
+                |e| Err(DataFusionError::into_arrow_external_error(e)),
+                |arrays| RecordBatch::try_new(self.schema.clone(), arrays),
+            )
+    }
 }
 
 /// Projection iterator
@@ -215,6 +224,7 @@ struct ProjectionStream {
     schema: SchemaRef,
     expr: Vec<Arc<dyn PhysicalExpr>>,
     input: SendableRecordBatchStream,
+    baseline_metrics: BaselineMetrics,
 }
 
 impl Stream for ProjectionStream {
@@ -224,10 +234,12 @@ impl Stream for ProjectionStream {
         mut self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
-        self.input.poll_next_unpin(cx).map(|x| match x {
-            Some(Ok(batch)) => Some(batch_project(&batch, &self.expr, &self.schema)),
+        let poll = self.input.poll_next_unpin(cx).map(|x| match x {
+            Some(Ok(batch)) => Some(self.batch_project(&batch)),
             other => other,
-        })
+        });
+
+        self.baseline_metrics.record_poll(poll)
     }
 
     fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index eaf988f..90173be 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -2361,13 +2361,15 @@ async fn explain_analyze_baseline_metrics() {
                FROM aggregate_test_100 \
                WHERE c13 != 'C2GT5KVyOPZpgKVl110TyZO0NcJ434' \
                GROUP BY c1 \
-               ORDER BY c1)";
+               ORDER BY c1) \
+               LIMIT 1";
     println!("running query: {}", sql);
     let plan = ctx.create_logical_plan(sql).unwrap();
     let plan = ctx.optimize(&plan).unwrap();
     let physical_plan = ctx.create_physical_plan(&plan).unwrap();
     let results = collect(physical_plan.clone()).await.unwrap();
     let formatted = arrow::util::pretty::pretty_format_batches(&results).unwrap();
+    println!("Query Output:\n\n{}", formatted);
     let formatted = normalize_for_explain(&formatted);
 
     assert_metrics!(
@@ -2395,14 +2397,32 @@ async fn explain_analyze_baseline_metrics() {
         "FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434",
         "metrics=[output_rows=99, elapsed_compute="
     );
+    assert_metrics!(
+        &formatted,
+        "GlobalLimitExec: limit=1, ",
+        "metrics=[output_rows=1, elapsed_compute="
+    );
+    assert_metrics!(
+        &formatted,
+        "ProjectionExec: expr=[COUNT(UInt8(1))",
+        "metrics=[output_rows=1, elapsed_compute="
+    );
+    assert_metrics!(
+        &formatted,
+        "CoalesceBatchesExec: target_batch_size=4096",
+        "metrics=[output_rows=5, elapsed_compute"
+    );
 
     fn expected_to_have_metrics(plan: &dyn ExecutionPlan) -> bool {
-        use datafusion::physical_plan::{
-            hash_aggregate::HashAggregateExec, sort::SortExec,
-        };
-
-        plan.as_any().downcast_ref::<SortExec>().is_some()
-            || plan.as_any().downcast_ref::<HashAggregateExec>().is_some()
+        use datafusion::physical_plan;
+
+        plan.as_any().downcast_ref::<physical_plan::sort::SortExec>().is_some()
+            || plan.as_any().downcast_ref::<physical_plan::hash_aggregate::HashAggregateExec>().is_some()
+            // CoalescePartitionsExec doesn't do any work so is not included
+            || plan.as_any().downcast_ref::<physical_plan::filter::FilterExec>().is_some()
+            || plan.as_any().downcast_ref::<physical_plan::projection::ProjectionExec>().is_some()
+            || plan.as_any().downcast_ref::<physical_plan::coalesce_batches::CoalesceBatchesExec>().is_some()
+            || plan.as_any().downcast_ref::<physical_plan::limit::GlobalLimitExec>().is_some()
     }
 
     // Validate that the recorded elapsed compute time was more than