You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/04/16 13:12:49 UTC
[arrow-datafusion] branch main updated: Add analyzer output to verbose explain (#6020)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 99e39fe1b8 Add analyzer output to verbose explain (#6020)
99e39fe1b8 is described below
commit 99e39fe1b844154bcf42ee28638366d23a5c3849
Author: Jeffrey <22...@users.noreply.github.com>
AuthorDate: Sun Apr 16 23:12:40 2023 +1000
Add analyzer output to verbose explain (#6020)
* Add analyzer output to verbose explain
* fmt
---
datafusion-examples/examples/rewrite_expr.rs | 3 +-
datafusion/core/src/execution/context.rs | 38 ++++++-
datafusion/core/tests/sql/subqueries.rs | 4 +-
.../core/tests/sqllogictests/test_files/dates.slt | 2 +-
datafusion/expr/src/logical_plan/plan.rs | 11 ++
datafusion/optimizer/src/analyzer/mod.rs | 18 +++-
datafusion/optimizer/src/analyzer/type_coercion.rs | 4 +-
datafusion/optimizer/src/test/mod.rs | 2 +-
datafusion/optimizer/tests/integration-test.rs | 8 +-
datafusion/proto/proto/datafusion.proto | 6 ++
datafusion/proto/src/generated/pbjson.rs | 118 +++++++++++++++++++++
datafusion/proto/src/generated/prost.rs | 12 ++-
datafusion/proto/src/logical_plan/from_proto.rs | 15 ++-
datafusion/proto/src/logical_plan/to_proto.rs | 19 +++-
14 files changed, 231 insertions(+), 29 deletions(-)
diff --git a/datafusion-examples/examples/rewrite_expr.rs b/datafusion-examples/examples/rewrite_expr.rs
index 451205e4cb..2777781eb9 100644
--- a/datafusion-examples/examples/rewrite_expr.rs
+++ b/datafusion-examples/examples/rewrite_expr.rs
@@ -50,7 +50,8 @@ pub fn main() -> Result<()> {
// run the analyzer with our custom rule
let config = OptimizerContext::default().with_skip_failing_rules(false);
let analyzer = Analyzer::with_rules(vec![Arc::new(MyAnalyzerRule {})]);
- let analyzed_plan = analyzer.execute_and_check(&logical_plan, config.options())?;
+ let analyzed_plan =
+ analyzer.execute_and_check(&logical_plan, config.options(), |_, _| {})?;
println!(
"Analyzed Logical Plan:\n\n{}\n",
analyzed_plan.display_indent()
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 7c2255eaf3..d25f96f649 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -1675,9 +1675,37 @@ impl SessionState {
if let LogicalPlan::Explain(e) = plan {
let mut stringified_plans = e.stringified_plans.clone();
- let analyzed_plan = self
- .analyzer
- .execute_and_check(e.plan.as_ref(), self.options())?;
+ // analyze & capture output of each rule
+ let analyzed_plan = match self.analyzer.execute_and_check(
+ e.plan.as_ref(),
+ self.options(),
+ |analyzed_plan, analyzer| {
+ let analyzer_name = analyzer.name().to_string();
+ let plan_type = PlanType::AnalyzedLogicalPlan { analyzer_name };
+ stringified_plans.push(analyzed_plan.to_stringified(plan_type));
+ },
+ ) {
+ Ok(plan) => plan,
+ Err(DataFusionError::Context(analyzer_name, err)) => {
+ let plan_type = PlanType::AnalyzedLogicalPlan { analyzer_name };
+ stringified_plans
+ .push(StringifiedPlan::new(plan_type, err.to_string()));
+
+ return Ok(LogicalPlan::Explain(Explain {
+ verbose: e.verbose,
+ plan: e.plan.clone(),
+ stringified_plans,
+ schema: e.schema.clone(),
+ logical_optimization_succeeded: false,
+ }));
+ }
+ Err(e) => return Err(e),
+ };
+
+ // to delineate the analyzer & optimizer phases in explain output
+ stringified_plans
+ .push(analyzed_plan.to_stringified(PlanType::FinalAnalyzedLogicalPlan));
+
// optimize the child plan, capturing the output of each optimizer
let (plan, logical_optimization_succeeded) = match self.optimizer.optimize(
&analyzed_plan,
@@ -1706,7 +1734,9 @@ impl SessionState {
logical_optimization_succeeded,
}))
} else {
- let analyzed_plan = self.analyzer.execute_and_check(plan, self.options())?;
+ let analyzed_plan =
+ self.analyzer
+ .execute_and_check(plan, self.options(), |_, _| {})?;
self.optimizer.optimize(&analyzed_plan, self, |_, _| {})
}
}
diff --git a/datafusion/core/tests/sql/subqueries.rs b/datafusion/core/tests/sql/subqueries.rs
index 6bdfbb7adf..eb7a12045c 100644
--- a/datafusion/core/tests/sql/subqueries.rs
+++ b/datafusion/core/tests/sql/subqueries.rs
@@ -185,7 +185,7 @@ async fn invalid_scalar_subquery() -> Result<()> {
let dataframe = ctx.sql(sql).await.expect(&msg);
let err = dataframe.into_optimized_plan().err().unwrap();
assert_eq!(
- "Plan(\"Scalar subquery should only return one column\")",
+ r#"Context("check_analyzed_plan", Plan("Scalar subquery should only return one column"))"#,
&format!("{err:?}")
);
@@ -203,7 +203,7 @@ async fn subquery_not_allowed() -> Result<()> {
let err = dataframe.into_optimized_plan().err().unwrap();
assert_eq!(
- "Plan(\"In/Exist subquery can not be used in Sort plan nodes\")",
+ r#"Context("check_analyzed_plan", Plan("In/Exist subquery can not be used in Sort plan nodes"))"#,
&format!("{err:?}")
);
diff --git a/datafusion/core/tests/sqllogictests/test_files/dates.slt b/datafusion/core/tests/sqllogictests/test_files/dates.slt
index 4102460704..6ab4730ef4 100644
--- a/datafusion/core/tests/sqllogictests/test_files/dates.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/dates.slt
@@ -85,7 +85,7 @@ g
h
## Plan error when compare Utf8 and timestamp in where clause
-statement error DataFusion error: Error during planning: Timestamp\(Nanosecond, Some\("\+00:00"\)\) \+ Utf8 can't be evaluated because there isn't a common type to coerce the types to
+statement error Error during planning: Timestamp\(Nanosecond, Some\("\+00:00"\)\) \+ Utf8 can't be evaluated because there isn't a common type to coerce the types to
select i_item_desc from test
where d3_date > now() + '5 days';
diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs
index 1e85e13684..50d5e7d607 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -1813,6 +1813,13 @@ pub enum Partitioning {
pub enum PlanType {
/// The initial LogicalPlan provided to DataFusion
InitialLogicalPlan,
+ /// The LogicalPlan which results from applying an analyzer pass
+ AnalyzedLogicalPlan {
+ /// The name of the analyzer which produced this plan
+ analyzer_name: String,
+ },
+ /// The LogicalPlan after all analyzer passes have been applied
+ FinalAnalyzedLogicalPlan,
/// The LogicalPlan which results from applying an optimizer pass
OptimizedLogicalPlan {
/// The name of the optimizer which produced this plan
@@ -1835,6 +1842,10 @@ impl Display for PlanType {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
match self {
PlanType::InitialLogicalPlan => write!(f, "initial_logical_plan"),
+ PlanType::AnalyzedLogicalPlan { analyzer_name } => {
+ write!(f, "logical_plan after {analyzer_name}")
+ }
+ PlanType::FinalAnalyzedLogicalPlan => write!(f, "analyzed_logical_plan"),
PlanType::OptimizedLogicalPlan { optimizer_name } => {
write!(f, "logical_plan after {optimizer_name}")
}
diff --git a/datafusion/optimizer/src/analyzer/mod.rs b/datafusion/optimizer/src/analyzer/mod.rs
index 401db7f025..595cd57deb 100644
--- a/datafusion/optimizer/src/analyzer/mod.rs
+++ b/datafusion/optimizer/src/analyzer/mod.rs
@@ -80,19 +80,29 @@ impl Analyzer {
/// Analyze the logical plan by applying analyzer rules, and
/// do necessary check and fail the invalid plans
- pub fn execute_and_check(
+ pub fn execute_and_check<F>(
&self,
plan: &LogicalPlan,
config: &ConfigOptions,
- ) -> Result<LogicalPlan> {
+ mut observer: F,
+ ) -> Result<LogicalPlan>
+ where
+ F: FnMut(&LogicalPlan, &dyn AnalyzerRule),
+ {
let start_time = Instant::now();
let mut new_plan = plan.clone();
// TODO add common rule executor for Analyzer and Optimizer
for rule in &self.rules {
- new_plan = rule.analyze(new_plan, config)?;
+ new_plan = rule.analyze(new_plan, config).map_err(|e| {
+ DataFusionError::Context(rule.name().to_string(), Box::new(e))
+ })?;
+ observer(&new_plan, rule.as_ref());
}
- check_plan(&new_plan)?;
+ // for easier display in explain output
+ check_plan(&new_plan).map_err(|e| {
+ DataFusionError::Context("check_analyzed_plan".to_string(), Box::new(e))
+ })?;
log_plan("Final analyzed plan", &new_plan);
debug!("Analyzer took {} ms", start_time.elapsed().as_millis());
Ok(new_plan)
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs
index 659b9fe164..9fb27036aa 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -835,7 +835,7 @@ mod test {
.err()
.unwrap();
assert_eq!(
- "Plan(\"Coercion from [Utf8] to the signature Uniform(1, [Int32]) failed.\")",
+ r#"Context("type_coercion", Plan("Coercion from [Utf8] to the signature Uniform(1, [Int32]) failed."))"#,
&format!("{err:?}")
);
Ok(())
@@ -914,7 +914,7 @@ mod test {
.err()
.unwrap();
assert_eq!(
- "Plan(\"Coercion from [Utf8] to the signature Uniform(1, [Float64]) failed.\")",
+ r#"Context("type_coercion", Plan("Coercion from [Utf8] to the signature Uniform(1, [Float64]) failed."))"#,
&format!("{err:?}")
);
Ok(())
diff --git a/datafusion/optimizer/src/test/mod.rs b/datafusion/optimizer/src/test/mod.rs
index 67d342b4cb..899dc74c79 100644
--- a/datafusion/optimizer/src/test/mod.rs
+++ b/datafusion/optimizer/src/test/mod.rs
@@ -115,7 +115,7 @@ pub fn assert_analyzed_plan_eq(
) -> Result<()> {
let options = ConfigOptions::default();
let analyzed_plan =
- Analyzer::with_rules(vec![rule]).execute_and_check(plan, &options)?;
+ Analyzer::with_rules(vec![rule]).execute_and_check(plan, &options, |_, _| {})?;
let formatted_plan = format!("{analyzed_plan:?}");
assert_eq!(formatted_plan, expected);
diff --git a/datafusion/optimizer/tests/integration-test.rs b/datafusion/optimizer/tests/integration-test.rs
index e58a2aaa00..43f329843e 100644
--- a/datafusion/optimizer/tests/integration-test.rs
+++ b/datafusion/optimizer/tests/integration-test.rs
@@ -22,7 +22,7 @@ use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource};
use datafusion_optimizer::analyzer::Analyzer;
use datafusion_optimizer::optimizer::Optimizer;
-use datafusion_optimizer::{OptimizerConfig, OptimizerContext, OptimizerRule};
+use datafusion_optimizer::{OptimizerConfig, OptimizerContext};
use datafusion_sql::planner::{ContextProvider, SqlToRel};
use datafusion_sql::sqlparser::ast::Statement;
use datafusion_sql::sqlparser::dialect::GenericDialect;
@@ -351,8 +351,8 @@ fn test_sql(sql: &str) -> Result<LogicalPlan> {
let analyzer = Analyzer::new();
let optimizer = Optimizer::new();
// analyze and optimize the logical plan
- let plan = analyzer.execute_and_check(&plan, config.options())?;
- optimizer.optimize(&plan, &config, &observe)
+ let plan = analyzer.execute_and_check(&plan, config.options(), |_, _| {})?;
+ optimizer.optimize(&plan, &config, |_, _| {})
}
#[derive(Default)]
@@ -412,8 +412,6 @@ impl ContextProvider for MySchemaProvider {
}
}
-fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
-
struct MyTableSource {
schema: SchemaRef,
}
diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto
index 49cf9c9806..1b8f999f28 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -926,6 +926,10 @@ message ArrowType{
//}
message EmptyMessage{}
+message AnalyzedLogicalPlanType {
+ string analyzer_name = 1;
+}
+
message OptimizedLogicalPlanType {
string optimizer_name = 1;
}
@@ -937,6 +941,8 @@ message OptimizedPhysicalPlanType {
message PlanType {
oneof plan_type_enum {
EmptyMessage InitialLogicalPlan = 1;
+ AnalyzedLogicalPlanType AnalyzedLogicalPlan = 7;
+ EmptyMessage FinalAnalyzedLogicalPlan = 8;
OptimizedLogicalPlanType OptimizedLogicalPlan = 2;
EmptyMessage FinalLogicalPlan = 3;
EmptyMessage InitialPhysicalPlan = 4;
diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs
index 3dca53e7b5..c2bfe3d08a 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -1068,6 +1068,98 @@ impl<'de> serde::Deserialize<'de> for AnalyzeNode {
deserializer.deserialize_struct("datafusion.AnalyzeNode", FIELDS, GeneratedVisitor)
}
}
+impl serde::Serialize for AnalyzedLogicalPlanType {
+ #[allow(deprecated)]
+ fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
+ where
+ S: serde::Serializer,
+ {
+ use serde::ser::SerializeStruct;
+ let mut len = 0;
+ if !self.analyzer_name.is_empty() {
+ len += 1;
+ }
+ let mut struct_ser = serializer.serialize_struct("datafusion.AnalyzedLogicalPlanType", len)?;
+ if !self.analyzer_name.is_empty() {
+ struct_ser.serialize_field("analyzerName", &self.analyzer_name)?;
+ }
+ struct_ser.end()
+ }
+}
+impl<'de> serde::Deserialize<'de> for AnalyzedLogicalPlanType {
+ #[allow(deprecated)]
+ fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ const FIELDS: &[&str] = &[
+ "analyzer_name",
+ "analyzerName",
+ ];
+
+ #[allow(clippy::enum_variant_names)]
+ enum GeneratedField {
+ AnalyzerName,
+ }
+ impl<'de> serde::Deserialize<'de> for GeneratedField {
+ fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ struct GeneratedVisitor;
+
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = GeneratedField;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(formatter, "expected one of: {:?}", &FIELDS)
+ }
+
+ #[allow(unused_variables)]
+ fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
+ where
+ E: serde::de::Error,
+ {
+ match value {
+ "analyzerName" | "analyzer_name" => Ok(GeneratedField::AnalyzerName),
+ _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
+ }
+ }
+ }
+ deserializer.deserialize_identifier(GeneratedVisitor)
+ }
+ }
+ struct GeneratedVisitor;
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = AnalyzedLogicalPlanType;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ formatter.write_str("struct datafusion.AnalyzedLogicalPlanType")
+ }
+
+ fn visit_map<V>(self, mut map: V) -> std::result::Result<AnalyzedLogicalPlanType, V::Error>
+ where
+ V: serde::de::MapAccess<'de>,
+ {
+ let mut analyzer_name__ = None;
+ while let Some(k) = map.next_key()? {
+ match k {
+ GeneratedField::AnalyzerName => {
+ if analyzer_name__.is_some() {
+ return Err(serde::de::Error::duplicate_field("analyzerName"));
+ }
+ analyzer_name__ = Some(map.next_value()?);
+ }
+ }
+ }
+ Ok(AnalyzedLogicalPlanType {
+ analyzer_name: analyzer_name__.unwrap_or_default(),
+ })
+ }
+ }
+ deserializer.deserialize_struct("datafusion.AnalyzedLogicalPlanType", FIELDS, GeneratedVisitor)
+ }
+}
impl serde::Serialize for ArrowType {
#[allow(deprecated)]
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
@@ -15923,6 +16015,12 @@ impl serde::Serialize for PlanType {
plan_type::PlanTypeEnum::InitialLogicalPlan(v) => {
struct_ser.serialize_field("InitialLogicalPlan", v)?;
}
+ plan_type::PlanTypeEnum::AnalyzedLogicalPlan(v) => {
+ struct_ser.serialize_field("AnalyzedLogicalPlan", v)?;
+ }
+ plan_type::PlanTypeEnum::FinalAnalyzedLogicalPlan(v) => {
+ struct_ser.serialize_field("FinalAnalyzedLogicalPlan", v)?;
+ }
plan_type::PlanTypeEnum::OptimizedLogicalPlan(v) => {
struct_ser.serialize_field("OptimizedLogicalPlan", v)?;
}
@@ -15951,6 +16049,8 @@ impl<'de> serde::Deserialize<'de> for PlanType {
{
const FIELDS: &[&str] = &[
"InitialLogicalPlan",
+ "AnalyzedLogicalPlan",
+ "FinalAnalyzedLogicalPlan",
"OptimizedLogicalPlan",
"FinalLogicalPlan",
"InitialPhysicalPlan",
@@ -15961,6 +16061,8 @@ impl<'de> serde::Deserialize<'de> for PlanType {
#[allow(clippy::enum_variant_names)]
enum GeneratedField {
InitialLogicalPlan,
+ AnalyzedLogicalPlan,
+ FinalAnalyzedLogicalPlan,
OptimizedLogicalPlan,
FinalLogicalPlan,
InitialPhysicalPlan,
@@ -15988,6 +16090,8 @@ impl<'de> serde::Deserialize<'de> for PlanType {
{
match value {
"InitialLogicalPlan" => Ok(GeneratedField::InitialLogicalPlan),
+ "AnalyzedLogicalPlan" => Ok(GeneratedField::AnalyzedLogicalPlan),
+ "FinalAnalyzedLogicalPlan" => Ok(GeneratedField::FinalAnalyzedLogicalPlan),
"OptimizedLogicalPlan" => Ok(GeneratedField::OptimizedLogicalPlan),
"FinalLogicalPlan" => Ok(GeneratedField::FinalLogicalPlan),
"InitialPhysicalPlan" => Ok(GeneratedField::InitialPhysicalPlan),
@@ -16020,6 +16124,20 @@ impl<'de> serde::Deserialize<'de> for PlanType {
return Err(serde::de::Error::duplicate_field("InitialLogicalPlan"));
}
plan_type_enum__ = map.next_value::<::std::option::Option<_>>()?.map(plan_type::PlanTypeEnum::InitialLogicalPlan)
+;
+ }
+ GeneratedField::AnalyzedLogicalPlan => {
+ if plan_type_enum__.is_some() {
+ return Err(serde::de::Error::duplicate_field("AnalyzedLogicalPlan"));
+ }
+ plan_type_enum__ = map.next_value::<::std::option::Option<_>>()?.map(plan_type::PlanTypeEnum::AnalyzedLogicalPlan)
+;
+ }
+ GeneratedField::FinalAnalyzedLogicalPlan => {
+ if plan_type_enum__.is_some() {
+ return Err(serde::de::Error::duplicate_field("FinalAnalyzedLogicalPlan"));
+ }
+ plan_type_enum__ = map.next_value::<::std::option::Option<_>>()?.map(plan_type::PlanTypeEnum::FinalAnalyzedLogicalPlan)
;
}
GeneratedField::OptimizedLogicalPlan => {
diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs
index 7e2d10ba83..84689e56f5 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1245,6 +1245,12 @@ pub mod arrow_type {
pub struct EmptyMessage {}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct AnalyzedLogicalPlanType {
+ #[prost(string, tag = "1")]
+ pub analyzer_name: ::prost::alloc::string::String,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
pub struct OptimizedLogicalPlanType {
#[prost(string, tag = "1")]
pub optimizer_name: ::prost::alloc::string::String,
@@ -1258,7 +1264,7 @@ pub struct OptimizedPhysicalPlanType {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PlanType {
- #[prost(oneof = "plan_type::PlanTypeEnum", tags = "1, 2, 3, 4, 5, 6")]
+ #[prost(oneof = "plan_type::PlanTypeEnum", tags = "1, 7, 8, 2, 3, 4, 5, 6")]
pub plan_type_enum: ::core::option::Option<plan_type::PlanTypeEnum>,
}
/// Nested message and enum types in `PlanType`.
@@ -1268,6 +1274,10 @@ pub mod plan_type {
pub enum PlanTypeEnum {
#[prost(message, tag = "1")]
InitialLogicalPlan(super::EmptyMessage),
+ #[prost(message, tag = "7")]
+ AnalyzedLogicalPlan(super::AnalyzedLogicalPlanType),
+ #[prost(message, tag = "8")]
+ FinalAnalyzedLogicalPlan(super::EmptyMessage),
#[prost(message, tag = "2")]
OptimizedLogicalPlan(super::OptimizedLogicalPlanType),
#[prost(message, tag = "3")]
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs
index bd544b93bd..fdb048c78e 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -18,11 +18,12 @@
use crate::protobuf::{
self,
plan_type::PlanTypeEnum::{
- FinalLogicalPlan, FinalPhysicalPlan, InitialLogicalPlan, InitialPhysicalPlan,
- OptimizedLogicalPlan, OptimizedPhysicalPlan,
+ AnalyzedLogicalPlan, FinalAnalyzedLogicalPlan, FinalLogicalPlan,
+ FinalPhysicalPlan, InitialLogicalPlan, InitialPhysicalPlan, OptimizedLogicalPlan,
+ OptimizedPhysicalPlan,
},
- CubeNode, GroupingSetNode, OptimizedLogicalPlanType, OptimizedPhysicalPlanType,
- PlaceholderNode, RollupNode,
+ AnalyzedLogicalPlanType, CubeNode, GroupingSetNode, OptimizedLogicalPlanType,
+ OptimizedPhysicalPlanType, PlaceholderNode, RollupNode,
};
use arrow::datatypes::{
DataType, Field, IntervalMonthDayNanoType, IntervalUnit, Schema, TimeUnit,
@@ -377,6 +378,12 @@ impl From<&protobuf::StringifiedPlan> for StringifiedPlan {
)
}) {
InitialLogicalPlan(_) => PlanType::InitialLogicalPlan,
+ AnalyzedLogicalPlan(AnalyzedLogicalPlanType { analyzer_name }) => {
+ PlanType::AnalyzedLogicalPlan {
+ analyzer_name:analyzer_name.clone()
+ }
+ }
+ FinalAnalyzedLogicalPlan(_) => PlanType::FinalAnalyzedLogicalPlan,
OptimizedLogicalPlan(OptimizedLogicalPlanType { optimizer_name }) => {
PlanType::OptimizedLogicalPlan {
optimizer_name: optimizer_name.clone(),
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs
index 7e6aa6b687..53bc8a7dce 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -23,11 +23,12 @@ use crate::protobuf::{
self,
arrow_type::ArrowTypeEnum,
plan_type::PlanTypeEnum::{
- FinalLogicalPlan, FinalPhysicalPlan, InitialLogicalPlan, InitialPhysicalPlan,
- OptimizedLogicalPlan, OptimizedPhysicalPlan,
+ AnalyzedLogicalPlan, FinalAnalyzedLogicalPlan, FinalLogicalPlan,
+ FinalPhysicalPlan, InitialLogicalPlan, InitialPhysicalPlan, OptimizedLogicalPlan,
+ OptimizedPhysicalPlan,
},
- CubeNode, EmptyMessage, GroupingSetNode, LogicalExprList, OptimizedLogicalPlanType,
- OptimizedPhysicalPlanType, PlaceholderNode, RollupNode,
+ AnalyzedLogicalPlanType, CubeNode, EmptyMessage, GroupingSetNode, LogicalExprList,
+ OptimizedLogicalPlanType, OptimizedPhysicalPlanType, PlaceholderNode, RollupNode,
};
use arrow::datatypes::{
DataType, Field, IntervalMonthDayNanoType, IntervalUnit, Schema, SchemaRef, TimeUnit,
@@ -318,6 +319,16 @@ impl From<&StringifiedPlan> for protobuf::StringifiedPlan {
PlanType::InitialLogicalPlan => Some(protobuf::PlanType {
plan_type_enum: Some(InitialLogicalPlan(EmptyMessage {})),
}),
+ PlanType::AnalyzedLogicalPlan { analyzer_name } => {
+ Some(protobuf::PlanType {
+ plan_type_enum: Some(AnalyzedLogicalPlan(
+ AnalyzedLogicalPlanType { analyzer_name },
+ )),
+ })
+ }
+ PlanType::FinalAnalyzedLogicalPlan => Some(protobuf::PlanType {
+ plan_type_enum: Some(FinalAnalyzedLogicalPlan(EmptyMessage {})),
+ }),
PlanType::OptimizedLogicalPlan { optimizer_name } => {
Some(protobuf::PlanType {
plan_type_enum: Some(OptimizedLogicalPlan(