You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/08/25 20:11:29 UTC
[arrow-datafusion] branch master updated: fixes #933 replace
placeholder fmt_as fr ExecutionPlan impls (#939)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 74d2942 fixes #933 replace placeholder fmt_as fr ExecutionPlan impls (#939)
74d2942 is described below
commit 74d29427a569a6a838a61c0b9794ec9095483f3d
Author: Tiphaine Ruy <ti...@users.noreply.github.com>
AuthorDate: Wed Aug 25 22:11:23 2021 +0200
fixes #933 replace placeholder fmt_as fr ExecutionPlan impls (#939)
* fixes #933 replace placeholder fmt_as fr ExecutionPlan impls
* Add window_expr vec to fmt_as for window_agg_exec
---
.../core/src/execution_plans/distributed_query.rs | 19 ++++++++++-
ballista/rust/executor/src/collect.rs | 16 ++++++++-
datafusion/src/physical_plan/json.rs | 13 ++++++++
datafusion/src/physical_plan/planner.rs | 13 ++++++++
datafusion/src/physical_plan/union.rs | 14 +++++++-
.../src/physical_plan/windows/window_agg_exec.rs | 23 +++++++++++--
datafusion/src/test/exec.rs | 39 +++++++++++++++++++++-
datafusion/tests/provider_filter_pushdown.rs | 16 ++++++++-
8 files changed, 146 insertions(+), 7 deletions(-)
diff --git a/ballista/rust/core/src/execution_plans/distributed_query.rs b/ballista/rust/core/src/execution_plans/distributed_query.rs
index 8abfe66..7793ad9 100644
--- a/ballista/rust/core/src/execution_plans/distributed_query.rs
+++ b/ballista/rust/core/src/execution_plans/distributed_query.rs
@@ -34,7 +34,8 @@ use datafusion::arrow::datatypes::{Schema, SchemaRef};
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_plan::LogicalPlan;
use datafusion::physical_plan::{
- ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
+ DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
+ SendableRecordBatchStream,
};
use async_trait::async_trait;
@@ -186,6 +187,22 @@ impl ExecutionPlan for DistributedQueryExec {
};
}
}
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(
+ f,
+ "DistributedQueryExec: scheduler_url={}",
+ self.scheduler_url
+ )
+ }
+ }
+ }
}
async fn fetch_partition(
diff --git a/ballista/rust/executor/src/collect.rs b/ballista/rust/executor/src/collect.rs
index a4c544f..e9448c8 100644
--- a/ballista/rust/executor/src/collect.rs
+++ b/ballista/rust/executor/src/collect.rs
@@ -27,7 +27,9 @@ use datafusion::arrow::{
datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch,
};
use datafusion::error::DataFusionError;
-use datafusion::physical_plan::{ExecutionPlan, Partitioning, SendableRecordBatchStream};
+use datafusion::physical_plan::{
+ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
+};
use datafusion::{error::Result, physical_plan::RecordBatchStream};
use futures::stream::SelectAll;
use futures::Stream;
@@ -102,6 +104,18 @@ impl ExecutionPlan for CollectExec {
select_all: Box::pin(futures::stream::select_all(streams)),
}))
}
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(f, "CollectExec")
+ }
+ }
+ }
}
struct MergedRecordBatchStream {
diff --git a/datafusion/src/physical_plan/json.rs b/datafusion/src/physical_plan/json.rs
index ed9b0b0..24631c5 100644
--- a/datafusion/src/physical_plan/json.rs
+++ b/datafusion/src/physical_plan/json.rs
@@ -19,6 +19,7 @@
use async_trait::async_trait;
use futures::Stream;
+use super::DisplayFormatType;
use super::{common, source::Source, ExecutionPlan, Partitioning, RecordBatchStream};
use crate::error::{DataFusionError, Result};
use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
@@ -311,6 +312,18 @@ impl ExecutionPlan for NdJsonExec {
}
}
}
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(f, "NdJsonExec: source={:?}", self.source)
+ }
+ }
+ }
}
struct NdJsonStream<R: Read> {
diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs
index dda0b66..1cc3625 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -1386,6 +1386,7 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
mod tests {
use super::*;
use crate::logical_plan::{DFField, DFSchema, DFSchemaRef};
+ use crate::physical_plan::DisplayFormatType;
use crate::physical_plan::{csv::CsvReadOptions, expressions, Partitioning};
use crate::scalar::ScalarValue;
use crate::{
@@ -1787,6 +1788,18 @@ mod tests {
async fn execute(&self, _partition: usize) -> Result<SendableRecordBatchStream> {
unimplemented!("NoOpExecutionPlan::execute");
}
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(f, "NoOpExecutionPlan")
+ }
+ }
+ }
}
// Produces an execution plan where the schema is mismatched from
diff --git a/datafusion/src/physical_plan/union.rs b/datafusion/src/physical_plan/union.rs
index cbab728..932bd5c 100644
--- a/datafusion/src/physical_plan/union.rs
+++ b/datafusion/src/physical_plan/union.rs
@@ -25,7 +25,7 @@ use std::{any::Any, sync::Arc};
use arrow::datatypes::SchemaRef;
-use super::{ExecutionPlan, Partitioning, SendableRecordBatchStream};
+use super::{DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream};
use crate::error::Result;
use async_trait::async_trait;
@@ -94,6 +94,18 @@ impl ExecutionPlan for UnionExec {
partition
)))
}
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(f, "UnionExec")
+ }
+ }
+ }
}
#[cfg(test)]
diff --git a/datafusion/src/physical_plan/windows/window_agg_exec.rs b/datafusion/src/physical_plan/windows/window_agg_exec.rs
index 2ff1f34..c746647 100644
--- a/datafusion/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/src/physical_plan/windows/window_agg_exec.rs
@@ -19,8 +19,8 @@
use crate::error::{DataFusionError, Result};
use crate::physical_plan::{
- common, Distribution, ExecutionPlan, Partitioning, RecordBatchStream,
- SendableRecordBatchStream, WindowExpr,
+ common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+ RecordBatchStream, SendableRecordBatchStream, WindowExpr,
};
use arrow::{
array::ArrayRef,
@@ -143,6 +143,25 @@ impl ExecutionPlan for WindowAggExec {
));
Ok(stream)
}
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(f, "WindowAggExec: ")?;
+ let g: Vec<String> = self
+ .window_expr
+ .iter()
+ .map(|e| format!("{}: {:?}", e.name().to_owned(), e.field()))
+ .collect();
+ write!(f, "wdw=[{}]", g.join(", "))?;
+ }
+ }
+ Ok(())
+ }
}
fn create_schema(
diff --git a/datafusion/src/test/exec.rs b/datafusion/src/test/exec.rs
index 247dab1..fa1f36c 100644
--- a/datafusion/src/test/exec.rs
+++ b/datafusion/src/test/exec.rs
@@ -33,7 +33,8 @@ use arrow::{
use futures::Stream;
use crate::physical_plan::{
- ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
+ DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
+ SendableRecordBatchStream,
};
use crate::{
error::{DataFusionError, Result},
@@ -190,6 +191,18 @@ impl ExecutionPlan for MockExec {
// returned stream simply reads off the rx stream
Ok(RecordBatchReceiverStream::create(&self.schema, rx))
}
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(f, "MockExec")
+ }
+ }
+ }
}
fn clone_error(e: &ArrowError) -> ArrowError {
@@ -281,6 +294,18 @@ impl ExecutionPlan for BarrierExec {
// returned stream simply reads off the rx stream
Ok(RecordBatchReceiverStream::create(&self.schema, rx))
}
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(f, "BarrierExec")
+ }
+ }
+ }
}
/// A mock execution plan that errors on a call to execute
@@ -331,4 +356,16 @@ impl ExecutionPlan for ErrorExec {
partition
)))
}
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(f, "ErrorExec")
+ }
+ }
+ }
}
diff --git a/datafusion/tests/provider_filter_pushdown.rs b/datafusion/tests/provider_filter_pushdown.rs
index 0bf67be..07b0eb2 100644
--- a/datafusion/tests/provider_filter_pushdown.rs
+++ b/datafusion/tests/provider_filter_pushdown.rs
@@ -26,7 +26,9 @@ use datafusion::error::Result;
use datafusion::execution::context::ExecutionContext;
use datafusion::logical_plan::Expr;
use datafusion::physical_plan::common::SizedRecordBatchStream;
-use datafusion::physical_plan::{ExecutionPlan, Partitioning, SendableRecordBatchStream};
+use datafusion::physical_plan::{
+ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
+};
use datafusion::prelude::*;
use datafusion::scalar::ScalarValue;
use std::sync::Arc;
@@ -84,6 +86,18 @@ impl ExecutionPlan for CustomPlan {
self.batches.clone(),
)))
}
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(f, "CustomPlan: batch_size={}", self.batches.len(),)
+ }
+ }
+ }
}
#[derive(Clone)]