You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/08/25 20:11:29 UTC

[arrow-datafusion] branch master updated: fixes #933 replace placeholder fmt_as fr ExecutionPlan impls (#939)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 74d2942  fixes #933 replace placeholder fmt_as fr ExecutionPlan impls (#939)
74d2942 is described below

commit 74d29427a569a6a838a61c0b9794ec9095483f3d
Author: Tiphaine Ruy <ti...@users.noreply.github.com>
AuthorDate: Wed Aug 25 22:11:23 2021 +0200

    fixes #933 replace placeholder fmt_as fr ExecutionPlan impls (#939)
    
    * fixes #933 replace placeholder fmt_as fr ExecutionPlan impls
    
    * Add window_expr vec to fmt_as for window_agg_exec
---
 .../core/src/execution_plans/distributed_query.rs  | 19 ++++++++++-
 ballista/rust/executor/src/collect.rs              | 16 ++++++++-
 datafusion/src/physical_plan/json.rs               | 13 ++++++++
 datafusion/src/physical_plan/planner.rs            | 13 ++++++++
 datafusion/src/physical_plan/union.rs              | 14 +++++++-
 .../src/physical_plan/windows/window_agg_exec.rs   | 23 +++++++++++--
 datafusion/src/test/exec.rs                        | 39 +++++++++++++++++++++-
 datafusion/tests/provider_filter_pushdown.rs       | 16 ++++++++-
 8 files changed, 146 insertions(+), 7 deletions(-)

diff --git a/ballista/rust/core/src/execution_plans/distributed_query.rs b/ballista/rust/core/src/execution_plans/distributed_query.rs
index 8abfe66..7793ad9 100644
--- a/ballista/rust/core/src/execution_plans/distributed_query.rs
+++ b/ballista/rust/core/src/execution_plans/distributed_query.rs
@@ -34,7 +34,8 @@ use datafusion::arrow::datatypes::{Schema, SchemaRef};
 use datafusion::error::{DataFusionError, Result};
 use datafusion::logical_plan::LogicalPlan;
 use datafusion::physical_plan::{
-    ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
+    DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
+    SendableRecordBatchStream,
 };
 
 use async_trait::async_trait;
@@ -186,6 +187,22 @@ impl ExecutionPlan for DistributedQueryExec {
             };
         }
     }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(
+                    f,
+                    "DistributedQueryExec: scheduler_url={}",
+                    self.scheduler_url
+                )
+            }
+        }
+    }
 }
 
 async fn fetch_partition(
diff --git a/ballista/rust/executor/src/collect.rs b/ballista/rust/executor/src/collect.rs
index a4c544f..e9448c8 100644
--- a/ballista/rust/executor/src/collect.rs
+++ b/ballista/rust/executor/src/collect.rs
@@ -27,7 +27,9 @@ use datafusion::arrow::{
     datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch,
 };
 use datafusion::error::DataFusionError;
-use datafusion::physical_plan::{ExecutionPlan, Partitioning, SendableRecordBatchStream};
+use datafusion::physical_plan::{
+    DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
+};
 use datafusion::{error::Result, physical_plan::RecordBatchStream};
 use futures::stream::SelectAll;
 use futures::Stream;
@@ -102,6 +104,18 @@ impl ExecutionPlan for CollectExec {
             select_all: Box::pin(futures::stream::select_all(streams)),
         }))
     }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "CollectExec")
+            }
+        }
+    }
 }
 
 struct MergedRecordBatchStream {
diff --git a/datafusion/src/physical_plan/json.rs b/datafusion/src/physical_plan/json.rs
index ed9b0b0..24631c5 100644
--- a/datafusion/src/physical_plan/json.rs
+++ b/datafusion/src/physical_plan/json.rs
@@ -19,6 +19,7 @@
 use async_trait::async_trait;
 use futures::Stream;
 
+use super::DisplayFormatType;
 use super::{common, source::Source, ExecutionPlan, Partitioning, RecordBatchStream};
 use crate::error::{DataFusionError, Result};
 use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
@@ -311,6 +312,18 @@ impl ExecutionPlan for NdJsonExec {
             }
         }
     }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "NdJsonExec: source={:?}", self.source)
+            }
+        }
+    }
 }
 
 struct NdJsonStream<R: Read> {
diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs
index dda0b66..1cc3625 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -1386,6 +1386,7 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
 mod tests {
     use super::*;
     use crate::logical_plan::{DFField, DFSchema, DFSchemaRef};
+    use crate::physical_plan::DisplayFormatType;
     use crate::physical_plan::{csv::CsvReadOptions, expressions, Partitioning};
     use crate::scalar::ScalarValue;
     use crate::{
@@ -1787,6 +1788,18 @@ mod tests {
         async fn execute(&self, _partition: usize) -> Result<SendableRecordBatchStream> {
             unimplemented!("NoOpExecutionPlan::execute");
         }
+
+        fn fmt_as(
+            &self,
+            t: DisplayFormatType,
+            f: &mut std::fmt::Formatter,
+        ) -> std::fmt::Result {
+            match t {
+                DisplayFormatType::Default => {
+                    write!(f, "NoOpExecutionPlan")
+                }
+            }
+        }
     }
 
     //  Produces an execution plan where the schema is mismatched from
diff --git a/datafusion/src/physical_plan/union.rs b/datafusion/src/physical_plan/union.rs
index cbab728..932bd5c 100644
--- a/datafusion/src/physical_plan/union.rs
+++ b/datafusion/src/physical_plan/union.rs
@@ -25,7 +25,7 @@ use std::{any::Any, sync::Arc};
 
 use arrow::datatypes::SchemaRef;
 
-use super::{ExecutionPlan, Partitioning, SendableRecordBatchStream};
+use super::{DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream};
 use crate::error::Result;
 use async_trait::async_trait;
 
@@ -94,6 +94,18 @@ impl ExecutionPlan for UnionExec {
             partition
         )))
     }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "UnionExec")
+            }
+        }
+    }
 }
 
 #[cfg(test)]
diff --git a/datafusion/src/physical_plan/windows/window_agg_exec.rs b/datafusion/src/physical_plan/windows/window_agg_exec.rs
index 2ff1f34..c746647 100644
--- a/datafusion/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/src/physical_plan/windows/window_agg_exec.rs
@@ -19,8 +19,8 @@
 
 use crate::error::{DataFusionError, Result};
 use crate::physical_plan::{
-    common, Distribution, ExecutionPlan, Partitioning, RecordBatchStream,
-    SendableRecordBatchStream, WindowExpr,
+    common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    RecordBatchStream, SendableRecordBatchStream, WindowExpr,
 };
 use arrow::{
     array::ArrayRef,
@@ -143,6 +143,25 @@ impl ExecutionPlan for WindowAggExec {
         ));
         Ok(stream)
     }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "WindowAggExec: ")?;
+                let g: Vec<String> = self
+                    .window_expr
+                    .iter()
+                    .map(|e| format!("{}: {:?}", e.name().to_owned(), e.field()))
+                    .collect();
+                write!(f, "wdw=[{}]", g.join(", "))?;
+            }
+        }
+        Ok(())
+    }
 }
 
 fn create_schema(
diff --git a/datafusion/src/test/exec.rs b/datafusion/src/test/exec.rs
index 247dab1..fa1f36c 100644
--- a/datafusion/src/test/exec.rs
+++ b/datafusion/src/test/exec.rs
@@ -33,7 +33,8 @@ use arrow::{
 use futures::Stream;
 
 use crate::physical_plan::{
-    ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
+    DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
+    SendableRecordBatchStream,
 };
 use crate::{
     error::{DataFusionError, Result},
@@ -190,6 +191,18 @@ impl ExecutionPlan for MockExec {
         // returned stream simply reads off the rx stream
         Ok(RecordBatchReceiverStream::create(&self.schema, rx))
     }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "MockExec")
+            }
+        }
+    }
 }
 
 fn clone_error(e: &ArrowError) -> ArrowError {
@@ -281,6 +294,18 @@ impl ExecutionPlan for BarrierExec {
         // returned stream simply reads off the rx stream
         Ok(RecordBatchReceiverStream::create(&self.schema, rx))
     }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "BarrierExec")
+            }
+        }
+    }
 }
 
 /// A mock execution plan that errors on a call to execute
@@ -331,4 +356,16 @@ impl ExecutionPlan for ErrorExec {
             partition
         )))
     }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "ErrorExec")
+            }
+        }
+    }
 }
diff --git a/datafusion/tests/provider_filter_pushdown.rs b/datafusion/tests/provider_filter_pushdown.rs
index 0bf67be..07b0eb2 100644
--- a/datafusion/tests/provider_filter_pushdown.rs
+++ b/datafusion/tests/provider_filter_pushdown.rs
@@ -26,7 +26,9 @@ use datafusion::error::Result;
 use datafusion::execution::context::ExecutionContext;
 use datafusion::logical_plan::Expr;
 use datafusion::physical_plan::common::SizedRecordBatchStream;
-use datafusion::physical_plan::{ExecutionPlan, Partitioning, SendableRecordBatchStream};
+use datafusion::physical_plan::{
+    DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
+};
 use datafusion::prelude::*;
 use datafusion::scalar::ScalarValue;
 use std::sync::Arc;
@@ -84,6 +86,18 @@ impl ExecutionPlan for CustomPlan {
             self.batches.clone(),
         )))
     }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "CustomPlan: batch_size={}", self.batches.len(),)
+            }
+        }
+    }
 }
 
 #[derive(Clone)]