You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/04/16 13:12:49 UTC

[arrow-datafusion] branch main updated: Add analyzer output to verbose explain (#6020)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 99e39fe1b8 Add analyzer output to verbose explain (#6020)
99e39fe1b8 is described below

commit 99e39fe1b844154bcf42ee28638366d23a5c3849
Author: Jeffrey <22...@users.noreply.github.com>
AuthorDate: Sun Apr 16 23:12:40 2023 +1000

    Add analyzer output to verbose explain (#6020)
    
    * Add analyzer output to verbose explain
    
    * fmt
---
 datafusion-examples/examples/rewrite_expr.rs       |   3 +-
 datafusion/core/src/execution/context.rs           |  38 ++++++-
 datafusion/core/tests/sql/subqueries.rs            |   4 +-
 .../core/tests/sqllogictests/test_files/dates.slt  |   2 +-
 datafusion/expr/src/logical_plan/plan.rs           |  11 ++
 datafusion/optimizer/src/analyzer/mod.rs           |  18 +++-
 datafusion/optimizer/src/analyzer/type_coercion.rs |   4 +-
 datafusion/optimizer/src/test/mod.rs               |   2 +-
 datafusion/optimizer/tests/integration-test.rs     |   8 +-
 datafusion/proto/proto/datafusion.proto            |   6 ++
 datafusion/proto/src/generated/pbjson.rs           | 118 +++++++++++++++++++++
 datafusion/proto/src/generated/prost.rs            |  12 ++-
 datafusion/proto/src/logical_plan/from_proto.rs    |  15 ++-
 datafusion/proto/src/logical_plan/to_proto.rs      |  19 +++-
 14 files changed, 231 insertions(+), 29 deletions(-)

diff --git a/datafusion-examples/examples/rewrite_expr.rs b/datafusion-examples/examples/rewrite_expr.rs
index 451205e4cb..2777781eb9 100644
--- a/datafusion-examples/examples/rewrite_expr.rs
+++ b/datafusion-examples/examples/rewrite_expr.rs
@@ -50,7 +50,8 @@ pub fn main() -> Result<()> {
     // run the analyzer with our custom rule
     let config = OptimizerContext::default().with_skip_failing_rules(false);
     let analyzer = Analyzer::with_rules(vec![Arc::new(MyAnalyzerRule {})]);
-    let analyzed_plan = analyzer.execute_and_check(&logical_plan, config.options())?;
+    let analyzed_plan =
+        analyzer.execute_and_check(&logical_plan, config.options(), |_, _| {})?;
     println!(
         "Analyzed Logical Plan:\n\n{}\n",
         analyzed_plan.display_indent()
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 7c2255eaf3..d25f96f649 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -1675,9 +1675,37 @@ impl SessionState {
         if let LogicalPlan::Explain(e) = plan {
             let mut stringified_plans = e.stringified_plans.clone();
 
-            let analyzed_plan = self
-                .analyzer
-                .execute_and_check(e.plan.as_ref(), self.options())?;
+            // analyze & capture output of each rule
+            let analyzed_plan = match self.analyzer.execute_and_check(
+                e.plan.as_ref(),
+                self.options(),
+                |analyzed_plan, analyzer| {
+                    let analyzer_name = analyzer.name().to_string();
+                    let plan_type = PlanType::AnalyzedLogicalPlan { analyzer_name };
+                    stringified_plans.push(analyzed_plan.to_stringified(plan_type));
+                },
+            ) {
+                Ok(plan) => plan,
+                Err(DataFusionError::Context(analyzer_name, err)) => {
+                    let plan_type = PlanType::AnalyzedLogicalPlan { analyzer_name };
+                    stringified_plans
+                        .push(StringifiedPlan::new(plan_type, err.to_string()));
+
+                    return Ok(LogicalPlan::Explain(Explain {
+                        verbose: e.verbose,
+                        plan: e.plan.clone(),
+                        stringified_plans,
+                        schema: e.schema.clone(),
+                        logical_optimization_succeeded: false,
+                    }));
+                }
+                Err(e) => return Err(e),
+            };
+
+            // to delineate the analyzer & optimizer phases in explain output
+            stringified_plans
+                .push(analyzed_plan.to_stringified(PlanType::FinalAnalyzedLogicalPlan));
+
             // optimize the child plan, capturing the output of each optimizer
             let (plan, logical_optimization_succeeded) = match self.optimizer.optimize(
                 &analyzed_plan,
@@ -1706,7 +1734,9 @@ impl SessionState {
                 logical_optimization_succeeded,
             }))
         } else {
-            let analyzed_plan = self.analyzer.execute_and_check(plan, self.options())?;
+            let analyzed_plan =
+                self.analyzer
+                    .execute_and_check(plan, self.options(), |_, _| {})?;
             self.optimizer.optimize(&analyzed_plan, self, |_, _| {})
         }
     }
diff --git a/datafusion/core/tests/sql/subqueries.rs b/datafusion/core/tests/sql/subqueries.rs
index 6bdfbb7adf..eb7a12045c 100644
--- a/datafusion/core/tests/sql/subqueries.rs
+++ b/datafusion/core/tests/sql/subqueries.rs
@@ -185,7 +185,7 @@ async fn invalid_scalar_subquery() -> Result<()> {
     let dataframe = ctx.sql(sql).await.expect(&msg);
     let err = dataframe.into_optimized_plan().err().unwrap();
     assert_eq!(
-        "Plan(\"Scalar subquery should only return one column\")",
+        r#"Context("check_analyzed_plan", Plan("Scalar subquery should only return one column"))"#,
         &format!("{err:?}")
     );
 
@@ -203,7 +203,7 @@ async fn subquery_not_allowed() -> Result<()> {
     let err = dataframe.into_optimized_plan().err().unwrap();
 
     assert_eq!(
-        "Plan(\"In/Exist subquery can not be used in Sort plan nodes\")",
+        r#"Context("check_analyzed_plan", Plan("In/Exist subquery can not be used in Sort plan nodes"))"#,
         &format!("{err:?}")
     );
 
diff --git a/datafusion/core/tests/sqllogictests/test_files/dates.slt b/datafusion/core/tests/sqllogictests/test_files/dates.slt
index 4102460704..6ab4730ef4 100644
--- a/datafusion/core/tests/sqllogictests/test_files/dates.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/dates.slt
@@ -85,7 +85,7 @@ g
 h
 
 ## Plan error when compare Utf8 and timestamp in where clause
-statement error DataFusion error: Error during planning: Timestamp\(Nanosecond, Some\("\+00:00"\)\) \+ Utf8 can't be evaluated because there isn't a common type to coerce the types to
+statement error Error during planning: Timestamp\(Nanosecond, Some\("\+00:00"\)\) \+ Utf8 can't be evaluated because there isn't a common type to coerce the types to
 select i_item_desc from test
 where d3_date > now() + '5 days';
 
diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs
index 1e85e13684..50d5e7d607 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -1813,6 +1813,13 @@ pub enum Partitioning {
 pub enum PlanType {
     /// The initial LogicalPlan provided to DataFusion
     InitialLogicalPlan,
+    /// The LogicalPlan which results from applying an analyzer pass
+    AnalyzedLogicalPlan {
+        /// The name of the analyzer which produced this plan
+        analyzer_name: String,
+    },
+    /// The LogicalPlan after all analyzer passes have been applied
+    FinalAnalyzedLogicalPlan,
     /// The LogicalPlan which results from applying an optimizer pass
     OptimizedLogicalPlan {
         /// The name of the optimizer which produced this plan
@@ -1835,6 +1842,10 @@ impl Display for PlanType {
     fn fmt(&self, f: &mut Formatter) -> fmt::Result {
         match self {
             PlanType::InitialLogicalPlan => write!(f, "initial_logical_plan"),
+            PlanType::AnalyzedLogicalPlan { analyzer_name } => {
+                write!(f, "logical_plan after {analyzer_name}")
+            }
+            PlanType::FinalAnalyzedLogicalPlan => write!(f, "analyzed_logical_plan"),
             PlanType::OptimizedLogicalPlan { optimizer_name } => {
                 write!(f, "logical_plan after {optimizer_name}")
             }
diff --git a/datafusion/optimizer/src/analyzer/mod.rs b/datafusion/optimizer/src/analyzer/mod.rs
index 401db7f025..595cd57deb 100644
--- a/datafusion/optimizer/src/analyzer/mod.rs
+++ b/datafusion/optimizer/src/analyzer/mod.rs
@@ -80,19 +80,29 @@ impl Analyzer {
 
     /// Analyze the logical plan by applying analyzer rules, and
     /// do necessary check and fail the invalid plans
-    pub fn execute_and_check(
+    pub fn execute_and_check<F>(
         &self,
         plan: &LogicalPlan,
         config: &ConfigOptions,
-    ) -> Result<LogicalPlan> {
+        mut observer: F,
+    ) -> Result<LogicalPlan>
+    where
+        F: FnMut(&LogicalPlan, &dyn AnalyzerRule),
+    {
         let start_time = Instant::now();
         let mut new_plan = plan.clone();
 
         // TODO add common rule executor for Analyzer and Optimizer
         for rule in &self.rules {
-            new_plan = rule.analyze(new_plan, config)?;
+            new_plan = rule.analyze(new_plan, config).map_err(|e| {
+                DataFusionError::Context(rule.name().to_string(), Box::new(e))
+            })?;
+            observer(&new_plan, rule.as_ref());
         }
-        check_plan(&new_plan)?;
+        // for easier display in explain output
+        check_plan(&new_plan).map_err(|e| {
+            DataFusionError::Context("check_analyzed_plan".to_string(), Box::new(e))
+        })?;
         log_plan("Final analyzed plan", &new_plan);
         debug!("Analyzer took {} ms", start_time.elapsed().as_millis());
         Ok(new_plan)
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs
index 659b9fe164..9fb27036aa 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -835,7 +835,7 @@ mod test {
             .err()
             .unwrap();
         assert_eq!(
-            "Plan(\"Coercion from [Utf8] to the signature Uniform(1, [Int32]) failed.\")",
+            r#"Context("type_coercion", Plan("Coercion from [Utf8] to the signature Uniform(1, [Int32]) failed."))"#,
             &format!("{err:?}")
         );
         Ok(())
@@ -914,7 +914,7 @@ mod test {
             .err()
             .unwrap();
         assert_eq!(
-            "Plan(\"Coercion from [Utf8] to the signature Uniform(1, [Float64]) failed.\")",
+            r#"Context("type_coercion", Plan("Coercion from [Utf8] to the signature Uniform(1, [Float64]) failed."))"#,
             &format!("{err:?}")
         );
         Ok(())
diff --git a/datafusion/optimizer/src/test/mod.rs b/datafusion/optimizer/src/test/mod.rs
index 67d342b4cb..899dc74c79 100644
--- a/datafusion/optimizer/src/test/mod.rs
+++ b/datafusion/optimizer/src/test/mod.rs
@@ -115,7 +115,7 @@ pub fn assert_analyzed_plan_eq(
 ) -> Result<()> {
     let options = ConfigOptions::default();
     let analyzed_plan =
-        Analyzer::with_rules(vec![rule]).execute_and_check(plan, &options)?;
+        Analyzer::with_rules(vec![rule]).execute_and_check(plan, &options, |_, _| {})?;
     let formatted_plan = format!("{analyzed_plan:?}");
     assert_eq!(formatted_plan, expected);
 
diff --git a/datafusion/optimizer/tests/integration-test.rs b/datafusion/optimizer/tests/integration-test.rs
index e58a2aaa00..43f329843e 100644
--- a/datafusion/optimizer/tests/integration-test.rs
+++ b/datafusion/optimizer/tests/integration-test.rs
@@ -22,7 +22,7 @@ use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource};
 use datafusion_optimizer::analyzer::Analyzer;
 use datafusion_optimizer::optimizer::Optimizer;
-use datafusion_optimizer::{OptimizerConfig, OptimizerContext, OptimizerRule};
+use datafusion_optimizer::{OptimizerConfig, OptimizerContext};
 use datafusion_sql::planner::{ContextProvider, SqlToRel};
 use datafusion_sql::sqlparser::ast::Statement;
 use datafusion_sql::sqlparser::dialect::GenericDialect;
@@ -351,8 +351,8 @@ fn test_sql(sql: &str) -> Result<LogicalPlan> {
     let analyzer = Analyzer::new();
     let optimizer = Optimizer::new();
     // analyze and optimize the logical plan
-    let plan = analyzer.execute_and_check(&plan, config.options())?;
-    optimizer.optimize(&plan, &config, &observe)
+    let plan = analyzer.execute_and_check(&plan, config.options(), |_, _| {})?;
+    optimizer.optimize(&plan, &config, |_, _| {})
 }
 
 #[derive(Default)]
@@ -412,8 +412,6 @@ impl ContextProvider for MySchemaProvider {
     }
 }
 
-fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
-
 struct MyTableSource {
     schema: SchemaRef,
 }
diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto
index 49cf9c9806..1b8f999f28 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -926,6 +926,10 @@ message ArrowType{
 //}
 message EmptyMessage{}
 
+message AnalyzedLogicalPlanType {
+  string analyzer_name = 1;
+}
+
 message OptimizedLogicalPlanType {
   string optimizer_name = 1;
 }
@@ -937,6 +941,8 @@ message OptimizedPhysicalPlanType {
 message PlanType {
   oneof plan_type_enum {
     EmptyMessage InitialLogicalPlan = 1;
+    AnalyzedLogicalPlanType AnalyzedLogicalPlan = 7;
+    EmptyMessage FinalAnalyzedLogicalPlan = 8;
     OptimizedLogicalPlanType OptimizedLogicalPlan = 2;
     EmptyMessage FinalLogicalPlan = 3;
     EmptyMessage InitialPhysicalPlan = 4;
diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs
index 3dca53e7b5..c2bfe3d08a 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -1068,6 +1068,98 @@ impl<'de> serde::Deserialize<'de> for AnalyzeNode {
         deserializer.deserialize_struct("datafusion.AnalyzeNode", FIELDS, GeneratedVisitor)
     }
 }
+impl serde::Serialize for AnalyzedLogicalPlanType {
+    #[allow(deprecated)]
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
+    where
+        S: serde::Serializer,
+    {
+        use serde::ser::SerializeStruct;
+        let mut len = 0;
+        if !self.analyzer_name.is_empty() {
+            len += 1;
+        }
+        let mut struct_ser = serializer.serialize_struct("datafusion.AnalyzedLogicalPlanType", len)?;
+        if !self.analyzer_name.is_empty() {
+            struct_ser.serialize_field("analyzerName", &self.analyzer_name)?;
+        }
+        struct_ser.end()
+    }
+}
+impl<'de> serde::Deserialize<'de> for AnalyzedLogicalPlanType {
+    #[allow(deprecated)]
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        const FIELDS: &[&str] = &[
+            "analyzer_name",
+            "analyzerName",
+        ];
+
+        #[allow(clippy::enum_variant_names)]
+        enum GeneratedField {
+            AnalyzerName,
+        }
+        impl<'de> serde::Deserialize<'de> for GeneratedField {
+            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
+            where
+                D: serde::Deserializer<'de>,
+            {
+                struct GeneratedVisitor;
+
+                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+                    type Value = GeneratedField;
+
+                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+                        write!(formatter, "expected one of: {:?}", &FIELDS)
+                    }
+
+                    #[allow(unused_variables)]
+                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
+                    where
+                        E: serde::de::Error,
+                    {
+                        match value {
+                            "analyzerName" | "analyzer_name" => Ok(GeneratedField::AnalyzerName),
+                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
+                        }
+                    }
+                }
+                deserializer.deserialize_identifier(GeneratedVisitor)
+            }
+        }
+        struct GeneratedVisitor;
+        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+            type Value = AnalyzedLogicalPlanType;
+
+            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+                formatter.write_str("struct datafusion.AnalyzedLogicalPlanType")
+            }
+
+            fn visit_map<V>(self, mut map: V) -> std::result::Result<AnalyzedLogicalPlanType, V::Error>
+                where
+                    V: serde::de::MapAccess<'de>,
+            {
+                let mut analyzer_name__ = None;
+                while let Some(k) = map.next_key()? {
+                    match k {
+                        GeneratedField::AnalyzerName => {
+                            if analyzer_name__.is_some() {
+                                return Err(serde::de::Error::duplicate_field("analyzerName"));
+                            }
+                            analyzer_name__ = Some(map.next_value()?);
+                        }
+                    }
+                }
+                Ok(AnalyzedLogicalPlanType {
+                    analyzer_name: analyzer_name__.unwrap_or_default(),
+                })
+            }
+        }
+        deserializer.deserialize_struct("datafusion.AnalyzedLogicalPlanType", FIELDS, GeneratedVisitor)
+    }
+}
 impl serde::Serialize for ArrowType {
     #[allow(deprecated)]
     fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
@@ -15923,6 +16015,12 @@ impl serde::Serialize for PlanType {
                 plan_type::PlanTypeEnum::InitialLogicalPlan(v) => {
                     struct_ser.serialize_field("InitialLogicalPlan", v)?;
                 }
+                plan_type::PlanTypeEnum::AnalyzedLogicalPlan(v) => {
+                    struct_ser.serialize_field("AnalyzedLogicalPlan", v)?;
+                }
+                plan_type::PlanTypeEnum::FinalAnalyzedLogicalPlan(v) => {
+                    struct_ser.serialize_field("FinalAnalyzedLogicalPlan", v)?;
+                }
                 plan_type::PlanTypeEnum::OptimizedLogicalPlan(v) => {
                     struct_ser.serialize_field("OptimizedLogicalPlan", v)?;
                 }
@@ -15951,6 +16049,8 @@ impl<'de> serde::Deserialize<'de> for PlanType {
     {
         const FIELDS: &[&str] = &[
             "InitialLogicalPlan",
+            "AnalyzedLogicalPlan",
+            "FinalAnalyzedLogicalPlan",
             "OptimizedLogicalPlan",
             "FinalLogicalPlan",
             "InitialPhysicalPlan",
@@ -15961,6 +16061,8 @@ impl<'de> serde::Deserialize<'de> for PlanType {
         #[allow(clippy::enum_variant_names)]
         enum GeneratedField {
             InitialLogicalPlan,
+            AnalyzedLogicalPlan,
+            FinalAnalyzedLogicalPlan,
             OptimizedLogicalPlan,
             FinalLogicalPlan,
             InitialPhysicalPlan,
@@ -15988,6 +16090,8 @@ impl<'de> serde::Deserialize<'de> for PlanType {
                     {
                         match value {
                             "InitialLogicalPlan" => Ok(GeneratedField::InitialLogicalPlan),
+                            "AnalyzedLogicalPlan" => Ok(GeneratedField::AnalyzedLogicalPlan),
+                            "FinalAnalyzedLogicalPlan" => Ok(GeneratedField::FinalAnalyzedLogicalPlan),
                             "OptimizedLogicalPlan" => Ok(GeneratedField::OptimizedLogicalPlan),
                             "FinalLogicalPlan" => Ok(GeneratedField::FinalLogicalPlan),
                             "InitialPhysicalPlan" => Ok(GeneratedField::InitialPhysicalPlan),
@@ -16020,6 +16124,20 @@ impl<'de> serde::Deserialize<'de> for PlanType {
                                 return Err(serde::de::Error::duplicate_field("InitialLogicalPlan"));
                             }
                             plan_type_enum__ = map.next_value::<::std::option::Option<_>>()?.map(plan_type::PlanTypeEnum::InitialLogicalPlan)
+;
+                        }
+                        GeneratedField::AnalyzedLogicalPlan => {
+                            if plan_type_enum__.is_some() {
+                                return Err(serde::de::Error::duplicate_field("AnalyzedLogicalPlan"));
+                            }
+                            plan_type_enum__ = map.next_value::<::std::option::Option<_>>()?.map(plan_type::PlanTypeEnum::AnalyzedLogicalPlan)
+;
+                        }
+                        GeneratedField::FinalAnalyzedLogicalPlan => {
+                            if plan_type_enum__.is_some() {
+                                return Err(serde::de::Error::duplicate_field("FinalAnalyzedLogicalPlan"));
+                            }
+                            plan_type_enum__ = map.next_value::<::std::option::Option<_>>()?.map(plan_type::PlanTypeEnum::FinalAnalyzedLogicalPlan)
 ;
                         }
                         GeneratedField::OptimizedLogicalPlan => {
diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs
index 7e2d10ba83..84689e56f5 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1245,6 +1245,12 @@ pub mod arrow_type {
 pub struct EmptyMessage {}
 #[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
+pub struct AnalyzedLogicalPlanType {
+    #[prost(string, tag = "1")]
+    pub analyzer_name: ::prost::alloc::string::String,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
 pub struct OptimizedLogicalPlanType {
     #[prost(string, tag = "1")]
     pub optimizer_name: ::prost::alloc::string::String,
@@ -1258,7 +1264,7 @@ pub struct OptimizedPhysicalPlanType {
 #[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PlanType {
-    #[prost(oneof = "plan_type::PlanTypeEnum", tags = "1, 2, 3, 4, 5, 6")]
+    #[prost(oneof = "plan_type::PlanTypeEnum", tags = "1, 7, 8, 2, 3, 4, 5, 6")]
     pub plan_type_enum: ::core::option::Option<plan_type::PlanTypeEnum>,
 }
 /// Nested message and enum types in `PlanType`.
@@ -1268,6 +1274,10 @@ pub mod plan_type {
     pub enum PlanTypeEnum {
         #[prost(message, tag = "1")]
         InitialLogicalPlan(super::EmptyMessage),
+        #[prost(message, tag = "7")]
+        AnalyzedLogicalPlan(super::AnalyzedLogicalPlanType),
+        #[prost(message, tag = "8")]
+        FinalAnalyzedLogicalPlan(super::EmptyMessage),
         #[prost(message, tag = "2")]
         OptimizedLogicalPlan(super::OptimizedLogicalPlanType),
         #[prost(message, tag = "3")]
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs
index bd544b93bd..fdb048c78e 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -18,11 +18,12 @@
 use crate::protobuf::{
     self,
     plan_type::PlanTypeEnum::{
-        FinalLogicalPlan, FinalPhysicalPlan, InitialLogicalPlan, InitialPhysicalPlan,
-        OptimizedLogicalPlan, OptimizedPhysicalPlan,
+        AnalyzedLogicalPlan, FinalAnalyzedLogicalPlan, FinalLogicalPlan,
+        FinalPhysicalPlan, InitialLogicalPlan, InitialPhysicalPlan, OptimizedLogicalPlan,
+        OptimizedPhysicalPlan,
     },
-    CubeNode, GroupingSetNode, OptimizedLogicalPlanType, OptimizedPhysicalPlanType,
-    PlaceholderNode, RollupNode,
+    AnalyzedLogicalPlanType, CubeNode, GroupingSetNode, OptimizedLogicalPlanType,
+    OptimizedPhysicalPlanType, PlaceholderNode, RollupNode,
 };
 use arrow::datatypes::{
     DataType, Field, IntervalMonthDayNanoType, IntervalUnit, Schema, TimeUnit,
@@ -377,6 +378,12 @@ impl From<&protobuf::StringifiedPlan> for StringifiedPlan {
                     )
                 }) {
                 InitialLogicalPlan(_) => PlanType::InitialLogicalPlan,
+                AnalyzedLogicalPlan(AnalyzedLogicalPlanType { analyzer_name }) => {
+                    PlanType::AnalyzedLogicalPlan {
+                        analyzer_name:analyzer_name.clone()
+                    }
+                }
+                FinalAnalyzedLogicalPlan(_) => PlanType::FinalAnalyzedLogicalPlan,
                 OptimizedLogicalPlan(OptimizedLogicalPlanType { optimizer_name }) => {
                     PlanType::OptimizedLogicalPlan {
                         optimizer_name: optimizer_name.clone(),
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs
index 7e6aa6b687..53bc8a7dce 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -23,11 +23,12 @@ use crate::protobuf::{
     self,
     arrow_type::ArrowTypeEnum,
     plan_type::PlanTypeEnum::{
-        FinalLogicalPlan, FinalPhysicalPlan, InitialLogicalPlan, InitialPhysicalPlan,
-        OptimizedLogicalPlan, OptimizedPhysicalPlan,
+        AnalyzedLogicalPlan, FinalAnalyzedLogicalPlan, FinalLogicalPlan,
+        FinalPhysicalPlan, InitialLogicalPlan, InitialPhysicalPlan, OptimizedLogicalPlan,
+        OptimizedPhysicalPlan,
     },
-    CubeNode, EmptyMessage, GroupingSetNode, LogicalExprList, OptimizedLogicalPlanType,
-    OptimizedPhysicalPlanType, PlaceholderNode, RollupNode,
+    AnalyzedLogicalPlanType, CubeNode, EmptyMessage, GroupingSetNode, LogicalExprList,
+    OptimizedLogicalPlanType, OptimizedPhysicalPlanType, PlaceholderNode, RollupNode,
 };
 use arrow::datatypes::{
     DataType, Field, IntervalMonthDayNanoType, IntervalUnit, Schema, SchemaRef, TimeUnit,
@@ -318,6 +319,16 @@ impl From<&StringifiedPlan> for protobuf::StringifiedPlan {
                 PlanType::InitialLogicalPlan => Some(protobuf::PlanType {
                     plan_type_enum: Some(InitialLogicalPlan(EmptyMessage {})),
                 }),
+                PlanType::AnalyzedLogicalPlan { analyzer_name } => {
+                    Some(protobuf::PlanType {
+                        plan_type_enum: Some(AnalyzedLogicalPlan(
+                            AnalyzedLogicalPlanType { analyzer_name },
+                        )),
+                    })
+                }
+                PlanType::FinalAnalyzedLogicalPlan => Some(protobuf::PlanType {
+                    plan_type_enum: Some(FinalAnalyzedLogicalPlan(EmptyMessage {})),
+                }),
                 PlanType::OptimizedLogicalPlan { optimizer_name } => {
                     Some(protobuf::PlanType {
                         plan_type_enum: Some(OptimizedLogicalPlan(