You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2021/07/04 08:54:11 UTC

[arrow-datafusion] branch master updated: Show physical plan with metrics in benchmark (#662)

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new a5b3a81  Show physical plan with metrics in benchmark (#662)
a5b3a81 is described below

commit a5b3a81127d5f93e82c80bb7ba07770e761874ae
Author: Andy Grove <an...@gmail.com>
AuthorDate: Sun Jul 4 02:53:44 2021 -0600

    Show physical plan with metrics in benchmark (#662)
---
 benchmarks/src/bin/tpch.rs              | 15 +++++++---
 datafusion/src/physical_plan/display.rs | 50 +++++++++++++++++++++++++++++----
 2 files changed, 55 insertions(+), 10 deletions(-)

diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 77c69f0..a52b6d2 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -42,6 +42,7 @@ use datafusion::prelude::*;
 
 use datafusion::parquet::basic::Compression;
 use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_plan::display::DisplayableExecutionPlan;
 use structopt::StructOpt;
 
 #[cfg(feature = "snmalloc")]
@@ -343,21 +344,27 @@ async fn execute_query(
     debug: bool,
 ) -> Result<Vec<RecordBatch>> {
     if debug {
-        println!("Logical plan:\n{:?}", plan);
+        println!("=== Logical plan ===\n{:?}\n", plan);
     }
     let plan = ctx.optimize(plan)?;
     if debug {
-        println!("Optimized logical plan:\n{:?}", plan);
+        println!("=== Optimized logical plan ===\n{:?}\n", plan);
     }
     let physical_plan = ctx.create_physical_plan(&plan)?;
     if debug {
         println!(
-            "Physical plan:\n{}",
+            "=== Physical plan ===\n{}\n",
             displayable(physical_plan.as_ref()).indent().to_string()
         );
     }
-    let result = collect(physical_plan).await?;
+    let result = collect(physical_plan.clone()).await?;
     if debug {
+        println!(
+            "=== Physical plan with metrics ===\n{}\n",
+            DisplayableExecutionPlan::with_metrics(physical_plan.as_ref())
+                .indent()
+                .to_string()
+        );
         pretty::print_batches(&result)?;
     }
     Ok(result)
diff --git a/datafusion/src/physical_plan/display.rs b/datafusion/src/physical_plan/display.rs
index e178ea1..8498e02 100644
--- a/datafusion/src/physical_plan/display.rs
+++ b/datafusion/src/physical_plan/display.rs
@@ -33,13 +33,27 @@ pub enum DisplayFormatType {
 /// Wraps an `ExecutionPlan` with various ways to display this plan
 pub struct DisplayableExecutionPlan<'a> {
     inner: &'a dyn ExecutionPlan,
+    /// whether to show metrics or not
+    with_metrics: bool,
 }
 
 impl<'a> DisplayableExecutionPlan<'a> {
     /// Create a wrapper around an [`'ExecutionPlan'] which can be
     /// pretty printed in a variety of ways
     pub fn new(inner: &'a dyn ExecutionPlan) -> Self {
-        Self { inner }
+        Self {
+            inner,
+            with_metrics: false,
+        }
+    }
+
+    /// Create a wrapper around an [`'ExecutionPlan'] which can be
+    /// pretty printed in a variety of ways
+    pub fn with_metrics(inner: &'a dyn ExecutionPlan) -> Self {
+        Self {
+            inner,
+            with_metrics: true,
+        }
     }
 
     /// Return a `format`able structure that produces a single line
@@ -53,15 +67,26 @@ impl<'a> DisplayableExecutionPlan<'a> {
     ///         CsvExec: source=...",
     /// ```
     pub fn indent(&self) -> impl fmt::Display + 'a {
-        struct Wrapper<'a>(&'a dyn ExecutionPlan);
+        struct Wrapper<'a> {
+            plan: &'a dyn ExecutionPlan,
+            with_metrics: bool,
+        }
         impl<'a> fmt::Display for Wrapper<'a> {
             fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
                 let t = DisplayFormatType::Default;
-                let mut visitor = IndentVisitor { t, f, indent: 0 };
-                accept(self.0, &mut visitor)
+                let mut visitor = IndentVisitor {
+                    t,
+                    f,
+                    indent: 0,
+                    with_metrics: self.with_metrics,
+                };
+                accept(self.plan, &mut visitor)
             }
         }
-        Wrapper(self.inner)
+        Wrapper {
+            plan: self.inner,
+            with_metrics: self.with_metrics,
+        }
     }
 }
 
@@ -71,8 +96,10 @@ struct IndentVisitor<'a, 'b> {
     t: DisplayFormatType,
     /// Write to this formatter
     f: &'a mut fmt::Formatter<'b>,
-    ///with_schema: bool,
+    /// Indent size
     indent: usize,
+    /// whether to show metrics or not
+    with_metrics: bool,
 }
 
 impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> {
@@ -83,6 +110,17 @@ impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> {
     ) -> std::result::Result<bool, Self::Error> {
         write!(self.f, "{:indent$}", "", indent = self.indent * 2)?;
         plan.fmt_as(self.t, self.f)?;
+        if self.with_metrics {
+            write!(
+                self.f,
+                ", metrics=[{}]",
+                plan.metrics()
+                    .iter()
+                    .map(|(k, v)| format!("{}={:?}", k, v.value))
+                    .collect::<Vec<_>>()
+                    .join(", ")
+            )?;
+        }
         writeln!(self.f)?;
         self.indent += 1;
         Ok(true)