You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/07 19:34:35 UTC
[arrow-datafusion] branch master updated: enable explain for ballista (#2163)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 6504d2a78 enable explain for ballista (#2163)
6504d2a78 is described below
commit 6504d2a781391df659f9be2ea9a24b4e8a0fdfee
Author: Jie Han <11...@users.noreply.github.com>
AuthorDate: Fri Apr 8 03:34:28 2022 +0800
enable explain for ballista (#2163)
* explain
* fmt
---
ballista/rust/core/proto/ballista.proto | 9 ++-
ballista/rust/core/proto/datafusion.proto | 24 +++++++
ballista/rust/core/src/serde/physical_plan/mod.rs | 77 +++++++++++++++-------
datafusion/core/src/optimizer/limit_push_down.rs | 18 ++---
.../core/src/optimizer/projection_push_down.rs | 36 +++++-----
datafusion/core/src/physical_plan/explain.rs | 5 ++
datafusion/proto/proto/datafusion.proto | 24 +++++++
datafusion/proto/src/from_proto.rs | 43 +++++++++++-
datafusion/proto/src/to_proto.rs | 47 ++++++++++++-
9 files changed, 225 insertions(+), 58 deletions(-)
diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto
index 1c50db6fd..2f69d4a26 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -183,7 +183,7 @@ message AnalyzeNode {
bool verbose = 2;
}
-message ExplainNode{
+message ExplainNode {
LogicalPlanNode input = 1;
bool verbose = 2;
}
@@ -269,6 +269,7 @@ message PhysicalPlanNode {
AvroScanExecNode avro_scan = 20;
PhysicalExtensionNode extension = 21;
UnionExecNode union = 22;
+ ExplainExecNode explain = 23;
}
}
@@ -461,6 +462,12 @@ message UnionExecNode {
repeated PhysicalPlanNode inputs = 1;
}
+message ExplainExecNode {
+ datafusion.Schema schema = 1;
+ repeated datafusion.StringifiedPlan stringified_plans = 2;
+ bool verbose = 3;
+}
+
message CrossJoinExecNode {
PhysicalPlanNode left = 1;
PhysicalPlanNode right = 2;
diff --git a/ballista/rust/core/proto/datafusion.proto b/ballista/rust/core/proto/datafusion.proto
index 6bf163a64..1dc9b34f7 100644
--- a/ballista/rust/core/proto/datafusion.proto
+++ b/ballista/rust/core/proto/datafusion.proto
@@ -515,3 +515,27 @@ message ArrowType{
// }
//}
message EmptyMessage{}
+
+message OptimizedLogicalPlanType {
+ string optimizer_name = 1;
+}
+
+message OptimizedPhysicalPlanType {
+ string optimizer_name = 1;
+}
+
+message PlanType {
+ oneof plan_type_enum {
+ EmptyMessage InitialLogicalPlan = 1;
+ OptimizedLogicalPlanType OptimizedLogicalPlan = 2;
+ EmptyMessage FinalLogicalPlan = 3;
+ EmptyMessage InitialPhysicalPlan = 4;
+ OptimizedPhysicalPlanType OptimizedPhysicalPlan = 5;
+ EmptyMessage FinalPhysicalPlan = 6;
+ }
+}
+
+message StringifiedPlan {
+ PlanType plan_type = 1;
+ string plan = 2;
+}
diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs b/ballista/rust/core/src/serde/physical_plan/mod.rs
index 3abb0713d..32eda9f3a 100644
--- a/ballista/rust/core/src/serde/physical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/physical_plan/mod.rs
@@ -15,24 +15,12 @@
// specific language governing permissions and limitations
// under the License.
-use crate::error::BallistaError;
-use crate::execution_plans::{
- ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec,
-};
-use crate::serde::physical_plan::from_proto::{
- parse_physical_expr, parse_protobuf_hash_partitioning,
-};
-use crate::serde::protobuf::physical_expr_node::ExprType;
-use crate::serde::protobuf::physical_plan_node::PhysicalPlanType;
-use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
+use std::convert::TryInto;
+use std::sync::Arc;
+
+use prost::bytes::BufMut;
+use prost::Message;
-use crate::serde::protobuf::{PhysicalExtensionNode, PhysicalPlanNode};
-use crate::serde::scheduler::PartitionLocation;
-use crate::serde::{
- byte_to_string, proto_error, protobuf, str_to_byte, AsExecutionPlan,
- PhysicalExtensionCodec,
-};
-use crate::{convert_required, into_physical_plan, into_required};
use datafusion::arrow::compute::SortOptions;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::datafusion_data_access::object_store::local::LocalFileSystem;
@@ -45,6 +33,7 @@ use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::cross_join::CrossJoinExec;
use datafusion::physical_plan::empty::EmptyExec;
+use datafusion::physical_plan::explain::ExplainExec;
use datafusion::physical_plan::expressions::{Column, PhysicalSortExpr};
use datafusion::physical_plan::file_format::{
AvroExec, CsvExec, FileScanConfig, ParquetExec,
@@ -62,10 +51,24 @@ use datafusion::physical_plan::{
AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr, WindowExpr,
};
use datafusion_proto::from_proto::parse_expr;
-use prost::bytes::BufMut;
-use prost::Message;
-use std::convert::TryInto;
-use std::sync::Arc;
+
+use crate::error::BallistaError;
+use crate::execution_plans::{
+ ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec,
+};
+use crate::serde::physical_plan::from_proto::{
+ parse_physical_expr, parse_protobuf_hash_partitioning,
+};
+use crate::serde::protobuf::physical_expr_node::ExprType;
+use crate::serde::protobuf::physical_plan_node::PhysicalPlanType;
+use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
+use crate::serde::protobuf::{PhysicalExtensionNode, PhysicalPlanNode};
+use crate::serde::scheduler::PartitionLocation;
+use crate::serde::{
+ byte_to_string, proto_error, protobuf, str_to_byte, AsExecutionPlan,
+ PhysicalExtensionCodec,
+};
+use crate::{convert_required, into_physical_plan, into_required};
pub mod from_proto;
pub mod to_proto;
@@ -103,6 +106,15 @@ impl AsExecutionPlan for PhysicalPlanNode {
))
})?;
match plan {
+ PhysicalPlanType::Explain(explain) => Ok(Arc::new(ExplainExec::new(
+ Arc::new(explain.schema.as_ref().unwrap().try_into()?),
+ explain
+ .stringified_plans
+ .iter()
+ .map(|plan| plan.into())
+ .collect(),
+ explain.verbose,
+ ))),
PhysicalPlanType::Projection(projection) => {
let input: Arc<dyn ExecutionPlan> = into_physical_plan!(
projection.input,
@@ -587,7 +599,21 @@ impl AsExecutionPlan for PhysicalPlanNode {
let plan_clone = plan.clone();
let plan = plan.as_any();
- if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
+ if let Some(exec) = plan.downcast_ref::<ExplainExec>() {
+ Ok(protobuf::PhysicalPlanNode {
+ physical_plan_type: Some(PhysicalPlanType::Explain(
+ protobuf::ExplainExecNode {
+ schema: Some(exec.schema().as_ref().into()),
+ stringified_plans: exec
+ .stringified_plans()
+ .iter()
+ .map(|plan| plan.into())
+ .collect(),
+ verbose: exec.verbose(),
+ },
+ )),
+ })
+ } else if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.input().to_owned(),
extension_codec,
@@ -1038,7 +1064,6 @@ mod roundtrip_tests {
use std::ops::Deref;
use std::sync::Arc;
- use crate::serde::{AsExecutionPlan, BallistaCodec};
use datafusion::arrow::array::ArrayRef;
use datafusion::execution::context::ExecutionProps;
use datafusion::logical_plan::create_udf;
@@ -1071,10 +1096,12 @@ mod roundtrip_tests {
scalar::ScalarValue,
};
- use super::super::super::error::Result;
- use super::super::protobuf;
use crate::execution_plans::ShuffleWriterExec;
use crate::serde::protobuf::{LogicalPlanNode, PhysicalPlanNode};
+ use crate::serde::{AsExecutionPlan, BallistaCodec};
+
+ use super::super::super::error::Result;
+ use super::super::protobuf;
fn roundtrip_test(exec_plan: Arc<dyn ExecutionPlan>) -> Result<()> {
let ctx = SessionContext::new();
diff --git a/datafusion/core/src/optimizer/limit_push_down.rs b/datafusion/core/src/optimizer/limit_push_down.rs
index 4fa6e2786..0c68f1761 100644
--- a/datafusion/core/src/optimizer/limit_push_down.rs
+++ b/datafusion/core/src/optimizer/limit_push_down.rs
@@ -39,10 +39,10 @@ impl LimitPushDown {
}
fn limit_push_down(
- optimizer: &LimitPushDown,
+ _optimizer: &LimitPushDown,
upper_limit: Option<usize>,
plan: &LogicalPlan,
- execution_props: &ExecutionProps,
+ _execution_props: &ExecutionProps,
) -> Result<LogicalPlan> {
match (plan, upper_limit) {
(LogicalPlan::Limit(Limit { n, input }), upper_limit) => {
@@ -51,10 +51,10 @@ fn limit_push_down(
n: smallest,
// push down limit to plan (minimum of upper limit and current limit)
input: Arc::new(limit_push_down(
- optimizer,
+ _optimizer,
Some(smallest),
input.as_ref(),
- execution_props,
+ _execution_props,
)?),
}))
}
@@ -91,10 +91,10 @@ fn limit_push_down(
Ok(LogicalPlan::Projection(Projection {
expr: expr.clone(),
input: Arc::new(limit_push_down(
- optimizer,
+ _optimizer,
upper_limit,
input.as_ref(),
- execution_props,
+ _execution_props,
)?),
schema: schema.clone(),
alias: alias.clone(),
@@ -115,10 +115,10 @@ fn limit_push_down(
Ok(LogicalPlan::Limit(Limit {
n: upper_limit,
input: Arc::new(limit_push_down(
- optimizer,
+ _optimizer,
Some(upper_limit),
x,
- execution_props,
+ _execution_props,
)?),
}))
})
@@ -138,7 +138,7 @@ fn limit_push_down(
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
- .map(|plan| limit_push_down(optimizer, None, plan, execution_props))
+ .map(|plan| limit_push_down(_optimizer, None, plan, _execution_props))
.collect::<Result<Vec<_>>>()?;
utils::from_plan(plan, &expr, &new_inputs)
diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs
index 7f43b5595..5b61ace11 100644
--- a/datafusion/core/src/optimizer/projection_push_down.rs
+++ b/datafusion/core/src/optimizer/projection_push_down.rs
@@ -126,11 +126,11 @@ fn get_projected_schema(
/// Recursively transverses the logical plan removing expressions and that are not needed.
fn optimize_plan(
- optimizer: &ProjectionPushDown,
+ _optimizer: &ProjectionPushDown,
plan: &LogicalPlan,
required_columns: &HashSet<Column>, // set of columns required up to this step
has_projection: bool,
- execution_props: &ExecutionProps,
+ _execution_props: &ExecutionProps,
) -> Result<LogicalPlan> {
let mut new_required_columns = required_columns.clone();
match plan {
@@ -165,11 +165,11 @@ fn optimize_plan(
})?;
let new_input = optimize_plan(
- optimizer,
+ _optimizer,
input,
&new_required_columns,
true,
- execution_props,
+ _execution_props,
)?;
let new_required_columns_optimized = new_input
@@ -211,19 +211,19 @@ fn optimize_plan(
}
let optimized_left = Arc::new(optimize_plan(
- optimizer,
+ _optimizer,
left,
&new_required_columns,
true,
- execution_props,
+ _execution_props,
)?);
let optimized_right = Arc::new(optimize_plan(
- optimizer,
+ _optimizer,
right,
&new_required_columns,
true,
- execution_props,
+ _execution_props,
)?);
let schema = build_join_schema(
@@ -272,11 +272,11 @@ fn optimize_plan(
)?;
LogicalPlanBuilder::from(optimize_plan(
- optimizer,
+ _optimizer,
input,
&new_required_columns,
true,
- execution_props,
+ _execution_props,
)?)
.window(new_window_expr)?
.build()
@@ -324,11 +324,11 @@ fn optimize_plan(
group_expr: group_expr.clone(),
aggr_expr: new_aggr_expr,
input: Arc::new(optimize_plan(
- optimizer,
+ _optimizer,
input,
&new_required_columns,
true,
- execution_props,
+ _execution_props,
)?),
schema: DFSchemaRef::new(new_schema),
}))
@@ -373,11 +373,11 @@ fn optimize_plan(
Ok(LogicalPlan::Analyze(Analyze {
input: Arc::new(optimize_plan(
- optimizer,
+ _optimizer,
&a.input,
&required_columns,
false,
- execution_props,
+ _execution_props,
)?),
verbose: a.verbose,
schema: a.schema.clone(),
@@ -409,11 +409,11 @@ fn optimize_plan(
new_required_columns.insert(f.qualified_column());
});
optimize_plan(
- optimizer,
+ _optimizer,
input_plan,
&new_required_columns,
has_projection,
- execution_props,
+ _execution_props,
)
})
.collect::<Result<Vec<_>>>()?;
@@ -457,11 +457,11 @@ fn optimize_plan(
.iter()
.map(|input_plan| {
optimize_plan(
- optimizer,
+ _optimizer,
input_plan,
&new_required_columns,
has_projection,
- execution_props,
+ _execution_props,
)
})
.collect::<Result<Vec<_>>>()?;
diff --git a/datafusion/core/src/physical_plan/explain.rs b/datafusion/core/src/physical_plan/explain.rs
index d09eae3ae..fd5ff03ca 100644
--- a/datafusion/core/src/physical_plan/explain.rs
+++ b/datafusion/core/src/physical_plan/explain.rs
@@ -66,6 +66,11 @@ impl ExplainExec {
pub fn stringified_plans(&self) -> &[StringifiedPlan] {
&self.stringified_plans
}
+
+ /// access to verbose
+ pub fn verbose(&self) -> bool {
+ self.verbose
+ }
}
#[async_trait]
diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto
index 1bb9aef3c..1e5a797ce 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -540,3 +540,27 @@ message ArrowType{
// }
//}
message EmptyMessage{}
+
+message OptimizedLogicalPlanType {
+ string optimizer_name = 1;
+}
+
+message OptimizedPhysicalPlanType {
+ string optimizer_name = 1;
+}
+
+message PlanType {
+ oneof plan_type_enum {
+ EmptyMessage InitialLogicalPlan = 1;
+ OptimizedLogicalPlanType OptimizedLogicalPlan = 2;
+ EmptyMessage FinalLogicalPlan = 3;
+ EmptyMessage InitialPhysicalPlan = 4;
+ OptimizedPhysicalPlanType OptimizedPhysicalPlan = 5;
+ EmptyMessage FinalPhysicalPlan = 6;
+ }
+}
+
+message StringifiedPlan {
+ PlanType plan_type = 1;
+ string plan = 2;
+}
diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs
index c98fc824b..0aa24dd8e 100644
--- a/datafusion/proto/src/from_proto.rs
+++ b/datafusion/proto/src/from_proto.rs
@@ -16,7 +16,13 @@
// under the License.
use crate::protobuf;
-use datafusion::logical_plan::FunctionRegistry;
+use crate::protobuf::plan_type::PlanTypeEnum::{
+ FinalLogicalPlan, FinalPhysicalPlan, InitialLogicalPlan, InitialPhysicalPlan,
+ OptimizedLogicalPlan, OptimizedPhysicalPlan,
+};
+use crate::protobuf::{OptimizedLogicalPlanType, OptimizedPhysicalPlanType};
+use datafusion::logical_plan::plan::StringifiedPlan;
+use datafusion::logical_plan::{FunctionRegistry, PlanType};
use datafusion::prelude::bit_length;
use datafusion::{
arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit, UnionMode},
@@ -363,6 +369,37 @@ impl TryFrom<&protobuf::Field> for Field {
}
}
+impl From<&protobuf::StringifiedPlan> for StringifiedPlan {
+ fn from(stringified_plan: &protobuf::StringifiedPlan) -> Self {
+ Self {
+ plan_type: match stringified_plan
+ .plan_type
+ .as_ref()
+ .unwrap()
+ .plan_type_enum
+ .as_ref()
+ .unwrap()
+ {
+ InitialLogicalPlan(_) => PlanType::InitialLogicalPlan,
+ OptimizedLogicalPlan(OptimizedLogicalPlanType { optimizer_name }) => {
+ PlanType::OptimizedLogicalPlan {
+ optimizer_name: optimizer_name.clone(),
+ }
+ }
+ FinalLogicalPlan(_) => PlanType::FinalLogicalPlan,
+ InitialPhysicalPlan(_) => PlanType::InitialPhysicalPlan,
+ OptimizedPhysicalPlan(OptimizedPhysicalPlanType { optimizer_name }) => {
+ PlanType::OptimizedPhysicalPlan {
+ optimizer_name: optimizer_name.clone(),
+ }
+ }
+ FinalPhysicalPlan(_) => PlanType::FinalPhysicalPlan,
+ },
+ plan: Arc::new(stringified_plan.plan.clone()),
+ }
+ }
+}
+
impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
fn from(f: &protobuf::ScalarFunction) -> Self {
use protobuf::ScalarFunction;
@@ -721,7 +758,7 @@ impl TryFrom<&protobuf::PrimitiveScalarType> for ScalarValue {
Ok(match scalar {
PrimitiveScalarType::Null => {
- return Err(proto_error("Untyped null is an invalid scalar value"))
+ return Err(proto_error("Untyped null is an invalid scalar value"));
}
PrimitiveScalarType::Bool => Self::Boolean(None),
PrimitiveScalarType::Uint8 => Self::UInt8(None),
@@ -1450,7 +1487,7 @@ fn typechecked_scalar_value_conversion(
PrimitiveScalarType::Null => {
return Err(proto_error(
"Untyped scalar null is not a valid scalar value",
- ))
+ ));
}
PrimitiveScalarType::Decimal128 => {
ScalarValue::Decimal128(None, 0, 0)
diff --git a/datafusion/proto/src/to_proto.rs b/datafusion/proto/src/to_proto.rs
index 332cbbde7..1e05f8047 100644
--- a/datafusion/proto/src/to_proto.rs
+++ b/datafusion/proto/src/to_proto.rs
@@ -20,7 +20,16 @@
//! processes.
use crate::protobuf;
+use crate::protobuf::plan_type::PlanTypeEnum::{
+ FinalLogicalPlan, FinalPhysicalPlan, InitialLogicalPlan, InitialPhysicalPlan,
+ OptimizedLogicalPlan, OptimizedPhysicalPlan,
+};
+use crate::protobuf::{
+ EmptyMessage, OptimizedLogicalPlanType, OptimizedPhysicalPlanType,
+};
+use datafusion::logical_plan::plan::StringifiedPlan;
+use datafusion::logical_plan::PlanType;
use datafusion::{
arrow::datatypes::{
DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit, UnionMode,
@@ -143,8 +152,6 @@ impl From<&DataType> for protobuf::ArrowType {
impl From<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
fn from(val: &DataType) -> Self {
- use protobuf::EmptyMessage;
-
match val {
DataType::Null => Self::None(EmptyMessage {}),
DataType::Boolean => Self::Bool(EmptyMessage {}),
@@ -294,6 +301,42 @@ impl From<&DFSchemaRef> for protobuf::DfSchema {
}
}
+impl From<&StringifiedPlan> for protobuf::StringifiedPlan {
+ fn from(stringified_plan: &StringifiedPlan) -> Self {
+ Self {
+ plan_type: match stringified_plan.clone().plan_type {
+ PlanType::InitialLogicalPlan => Some(protobuf::PlanType {
+ plan_type_enum: Some(InitialLogicalPlan(EmptyMessage {})),
+ }),
+ PlanType::OptimizedLogicalPlan { optimizer_name } => {
+ Some(protobuf::PlanType {
+ plan_type_enum: Some(OptimizedLogicalPlan(
+ OptimizedLogicalPlanType { optimizer_name },
+ )),
+ })
+ }
+ PlanType::FinalLogicalPlan => Some(protobuf::PlanType {
+ plan_type_enum: Some(FinalLogicalPlan(EmptyMessage {})),
+ }),
+ PlanType::InitialPhysicalPlan => Some(protobuf::PlanType {
+ plan_type_enum: Some(InitialPhysicalPlan(EmptyMessage {})),
+ }),
+ PlanType::OptimizedPhysicalPlan { optimizer_name } => {
+ Some(protobuf::PlanType {
+ plan_type_enum: Some(OptimizedPhysicalPlan(
+ OptimizedPhysicalPlanType { optimizer_name },
+ )),
+ })
+ }
+ PlanType::FinalPhysicalPlan => Some(protobuf::PlanType {
+ plan_type_enum: Some(FinalPhysicalPlan(EmptyMessage {})),
+ }),
+ },
+ plan: stringified_plan.plan.to_string(),
+ }
+ }
+}
+
impl From<&AggregateFunction> for protobuf::AggregateFunction {
fn from(value: &AggregateFunction) -> Self {
match value {