You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/11/18 12:12:09 UTC

[arrow-datafusion] branch master updated: Extract CreateMemoryTable, DropTable, CreateExternalTable in LogicalPlan as independent struct (#1311)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new b8ff94c  Extract CreateMemoryTable, DropTable, CreateExternalTable in LogicalPlan as independent struct (#1311)
b8ff94c is described below

commit b8ff94ce9ebdf861c595c06a78860b5e1dbf2d33
Author: Kun Liu <li...@apache.org>
AuthorDate: Thu Nov 18 20:12:04 2021 +0800

    Extract CreateMemoryTable, DropTable, CreateExternalTable in LogicalPlan as independent struct (#1311)
    
    * refactor the plan for: table ddl
    
    * fix the conflict for import
---
 ballista/rust/client/src/context.rs                |   7 +-
 .../rust/core/src/serde/logical_plan/from_proto.rs |   8 +-
 ballista/rust/core/src/serde/logical_plan/mod.rs   |  18 ++--
 .../rust/core/src/serde/logical_plan/to_proto.rs   |  12 +--
 ballista/rust/core/src/utils.rs                    |   2 +-
 datafusion/src/execution/context.rs                |  13 +--
 datafusion/src/logical_plan/mod.rs                 |   3 +-
 datafusion/src/logical_plan/plan.rs                | 117 +++++++++++++--------
 .../src/optimizer/common_subexpr_eliminate.rs      |   6 +-
 datafusion/src/optimizer/constant_folding.rs       |   6 +-
 datafusion/src/optimizer/filter_push_down.rs       |   3 +-
 datafusion/src/optimizer/limit_push_down.rs        |   2 +-
 datafusion/src/optimizer/projection_push_down.rs   |   6 +-
 datafusion/src/optimizer/utils.rs                  |  15 +--
 datafusion/src/physical_plan/planner.rs            |   6 +-
 datafusion/src/sql/planner.rs                      |  16 +--
 datafusion/tests/custom_sources.rs                 |   2 +-
 datafusion/tests/sql.rs                            |   2 +-
 18 files changed, 137 insertions(+), 107 deletions(-)

diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs
index 7eb8a1a..00a92de 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -30,8 +30,7 @@ use datafusion::dataframe::DataFrame;
 use datafusion::datasource::TableProvider;
 use datafusion::error::{DataFusionError, Result};
 use datafusion::execution::dataframe_impl::DataFrameImpl;
-use datafusion::logical_plan::plan::TableScanPlan;
-use datafusion::logical_plan::LogicalPlan;
+use datafusion::logical_plan::{CreateExternalTable, LogicalPlan, TableScanPlan};
 use datafusion::prelude::{AvroReadOptions, CsvReadOptions};
 use datafusion::sql::parser::FileType;
 
@@ -270,13 +269,13 @@ impl BallistaContext {
 
         let plan = ctx.create_logical_plan(sql)?;
         match plan {
-            LogicalPlan::CreateExternalTable {
+            LogicalPlan::CreateExternalTable(CreateExternalTable {
                 ref schema,
                 ref name,
                 ref location,
                 ref file_type,
                 ref has_header,
-            } => match file_type {
+            }) => match file_type {
                 FileType::CSV => {
                     self.register_csv(
                         name,
diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
index 66bec78..ba40488 100644
--- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
@@ -33,8 +33,8 @@ use datafusion::logical_plan::window_frames::{
 };
 use datafusion::logical_plan::{
     abs, acos, asin, atan, ceil, cos, digest, exp, floor, ln, log10, log2, round, signum,
-    sin, sqrt, tan, trunc, Column, DFField, DFSchema, Expr, JoinConstraint, JoinType,
-    LogicalPlan, LogicalPlanBuilder, Operator,
+    sin, sqrt, tan, trunc, Column, CreateExternalTable, DFField, DFSchema, Expr,
+    JoinConstraint, JoinType, LogicalPlan, LogicalPlanBuilder, Operator,
 };
 use datafusion::physical_plan::aggregates::AggregateFunction;
 use datafusion::physical_plan::window_functions::BuiltInWindowFunction;
@@ -271,13 +271,13 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
                 let pb_file_type: protobuf::FileType =
                     create_extern_table.file_type.try_into()?;
 
-                Ok(LogicalPlan::CreateExternalTable {
+                Ok(LogicalPlan::CreateExternalTable(CreateExternalTable {
                     schema: pb_schema.try_into()?,
                     name: create_extern_table.name.clone(),
                     location: create_extern_table.location.clone(),
                     file_type: pb_file_type.into(),
                     has_header: create_extern_table.has_header,
-                })
+                }))
             }
             LogicalPlanType::Analyze(analyze) => {
                 let input: LogicalPlan = convert_box_required!(analyze.input)?;
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index 09bcf1f..472399b 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -28,7 +28,8 @@ mod roundtrip_tests {
         arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit},
         datasource::object_store::local::LocalFileSystem,
         logical_plan::{
-            col, Expr, LogicalPlan, LogicalPlanBuilder, Partitioning, ToDFSchema,
+            col, CreateExternalTable, Expr, LogicalPlan, LogicalPlanBuilder,
+            Partitioning, ToDFSchema,
         },
         physical_plan::functions::BuiltinScalarFunction::Sqrt,
         prelude::*,
@@ -655,13 +656,14 @@ mod roundtrip_tests {
         ];
 
         for file in filetypes.iter() {
-            let create_table_node = LogicalPlan::CreateExternalTable {
-                schema: df_schema_ref.clone(),
-                name: String::from("TestName"),
-                location: String::from("employee.csv"),
-                file_type: *file,
-                has_header: true,
-            };
+            let create_table_node =
+                LogicalPlan::CreateExternalTable(CreateExternalTable {
+                    schema: df_schema_ref.clone(),
+                    name: String::from("TestName"),
+                    location: String::from("employee.csv"),
+                    file_type: *file,
+                    has_header: true,
+                });
 
             roundtrip_test!(create_table_node);
         }
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index da36c9f..825a4b9 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -32,9 +32,9 @@ use datafusion::datasource::file_format::parquet::ParquetFormat;
 use datafusion::datasource::listing::ListingTable;
 use datafusion::logical_plan::{
     exprlist_to_fields,
-    plan::TableScanPlan,
     window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
-    Column, Expr, JoinConstraint, JoinType, LogicalPlan,
+    Column, CreateExternalTable, Expr, JoinConstraint, JoinType, LogicalPlan,
+    TableScanPlan,
 };
 use datafusion::physical_plan::aggregates::AggregateFunction;
 use datafusion::physical_plan::functions::BuiltinScalarFunction;
@@ -945,13 +945,13 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
                     },
                 )),
             }),
-            LogicalPlan::CreateExternalTable {
+            LogicalPlan::CreateExternalTable(CreateExternalTable {
                 name,
                 location,
                 file_type,
                 has_header,
                 schema: df_schema,
-            } => {
+            }) => {
                 use datafusion::sql::parser::FileType;
 
                 let pb_file_type: protobuf::FileType = match file_type {
@@ -1009,10 +1009,10 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
                     ))),
                 })
             }
-            LogicalPlan::CreateMemoryTable { .. } => Err(proto_error(
+            LogicalPlan::CreateMemoryTable(_) => Err(proto_error(
                 "Error converting CreateMemoryTable. Not yet supported in Ballista",
             )),
-            LogicalPlan::DropTable { .. } => Err(proto_error(
+            LogicalPlan::DropTable(_) => Err(proto_error(
                 "Error converting DropTable. Not yet supported in Ballista",
             )),
         }
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index 80391b3..2dfdb3d 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -277,7 +277,7 @@ impl QueryPlanner for BallistaQueryPlanner {
         _ctx_state: &ExecutionContextState,
     ) -> std::result::Result<Arc<dyn ExecutionPlan>, DataFusionError> {
         match logical_plan {
-            LogicalPlan::CreateExternalTable { .. } => {
+            LogicalPlan::CreateExternalTable(_) => {
                 // table state is managed locally in the BallistaContext, not in the scheduler
                 Ok(Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))))
             }
diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index 08e2e70..52c5160 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -63,7 +63,8 @@ use crate::datasource::TableProvider;
 use crate::error::{DataFusionError, Result};
 use crate::execution::dataframe_impl::DataFrameImpl;
 use crate::logical_plan::{
-    FunctionRegistry, LogicalPlan, LogicalPlanBuilder, UNNAMED_TABLE,
+    CreateExternalTable, CreateMemoryTable, DropTable, FunctionRegistry, LogicalPlan,
+    LogicalPlanBuilder, UNNAMED_TABLE,
 };
 use crate::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
 use crate::optimizer::constant_folding::ConstantFolding;
@@ -191,13 +192,13 @@ impl ExecutionContext {
     pub async fn sql(&mut self, sql: &str) -> Result<Arc<dyn DataFrame>> {
         let plan = self.create_logical_plan(sql)?;
         match plan {
-            LogicalPlan::CreateExternalTable {
+            LogicalPlan::CreateExternalTable(CreateExternalTable {
                 ref schema,
                 ref name,
                 ref location,
                 ref file_type,
                 ref has_header,
-            } => {
+            }) => {
                 let file_format = match file_type {
                     FileType::CSV => {
                         Ok(Arc::new(CsvFormat::default().with_has_header(*has_header))
@@ -241,7 +242,7 @@ impl ExecutionContext {
                 Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
             }
 
-            LogicalPlan::CreateMemoryTable { input, name } => {
+            LogicalPlan::CreateMemoryTable(CreateMemoryTable { name, input }) => {
                 let plan = self.optimize(&input)?;
                 let physical = Arc::new(DataFrameImpl::new(self.state.clone(), &plan));
 
@@ -256,7 +257,7 @@ impl ExecutionContext {
                 Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
             }
 
-            LogicalPlan::DropTable { name, if_exist, .. } => {
+            LogicalPlan::DropTable(DropTable { name, if_exist, .. }) => {
                 let returned = self.deregister_table(name.as_str())?;
                 if !if_exist && returned.is_none() {
                     Err(DataFusionError::Execution(format!(
@@ -1174,7 +1175,7 @@ impl FunctionRegistry for ExecutionContextState {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::logical_plan::plan::TableScanPlan;
+    use crate::logical_plan::TableScanPlan;
     use crate::logical_plan::{binary_expr, lit, Operator};
     use crate::physical_plan::functions::{make_scalar_function, Volatility};
     use crate::physical_plan::{collect, collect_partitioned};
diff --git a/datafusion/src/logical_plan/mod.rs b/datafusion/src/logical_plan/mod.rs
index 5db6a99..77193eb 100644
--- a/datafusion/src/logical_plan/mod.rs
+++ b/datafusion/src/logical_plan/mod.rs
@@ -51,7 +51,8 @@ pub use expr::{
 pub use extension::UserDefinedLogicalNode;
 pub use operators::Operator;
 pub use plan::{
-    JoinConstraint, JoinType, LogicalPlan, Partitioning, PlanType, PlanVisitor,
+    CreateExternalTable, CreateMemoryTable, DropTable, JoinConstraint, JoinType,
+    LogicalPlan, Partitioning, PlanType, PlanVisitor, TableScanPlan,
 };
 pub(crate) use plan::{StringifiedPlan, ToStringifiedPlan};
 pub use registry::FunctionRegistry;
diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs
index d9f3d9a..ea56bc0 100644
--- a/datafusion/src/logical_plan/plan.rs
+++ b/datafusion/src/logical_plan/plan.rs
@@ -74,6 +74,41 @@ pub struct TableScanPlan {
     pub limit: Option<usize>,
 }
 
+/// Creates an in memory table.
+#[derive(Clone)]
+pub struct CreateMemoryTable {
+    /// The table name
+    pub name: String,
+    /// The logical plan
+    pub input: Arc<LogicalPlan>,
+}
+
+/// Creates an external table.
+#[derive(Clone)]
+pub struct CreateExternalTable {
+    /// The table schema
+    pub schema: DFSchemaRef,
+    /// The table name
+    pub name: String,
+    /// The physical location
+    pub location: String,
+    /// The file type of physical file
+    pub file_type: FileType,
+    /// Whether the CSV file contains a header
+    pub has_header: bool,
+}
+
+/// Drops a table.
+#[derive(Clone)]
+pub struct DropTable {
+    /// The table name
+    pub name: String,
+    /// If the table exists
+    pub if_exist: bool,
+    /// Dummy schema
+    pub schema: DFSchemaRef,
+}
+
 /// Produces a relation with string representations of
 /// various parts of the plan
 #[derive(Clone)]
@@ -230,34 +265,11 @@ pub enum LogicalPlan {
         input: Arc<LogicalPlan>,
     },
     /// Creates an external table.
-    CreateExternalTable {
-        /// The table schema
-        schema: DFSchemaRef,
-        /// The table name
-        name: String,
-        /// The physical location
-        location: String,
-        /// The file type of physical file
-        file_type: FileType,
-        /// Whether the CSV file contains a header
-        has_header: bool,
-    },
+    CreateExternalTable(CreateExternalTable),
     /// Creates an in memory table.
-    CreateMemoryTable {
-        /// The table name
-        name: String,
-        /// The logical plan
-        input: Arc<LogicalPlan>,
-    },
+    CreateMemoryTable(CreateMemoryTable),
     /// Drops a table.
-    DropTable {
-        /// The table name
-        name: String,
-        /// If the table exists
-        if_exist: bool,
-        /// Dummy schema
-        schema: DFSchemaRef,
-    },
+    DropTable(DropTable),
     /// Values expression. See
     /// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
     /// documentation for more details.
@@ -295,13 +307,17 @@ impl LogicalPlan {
             LogicalPlan::CrossJoin { schema, .. } => schema,
             LogicalPlan::Repartition { input, .. } => input.schema(),
             LogicalPlan::Limit { input, .. } => input.schema(),
-            LogicalPlan::CreateExternalTable { schema, .. } => schema,
+            LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => {
+                schema
+            }
             LogicalPlan::Explain(explain) => &explain.schema,
             LogicalPlan::Analyze(analyze) => &analyze.schema,
             LogicalPlan::Extension(extension) => extension.node.schema(),
             LogicalPlan::Union { schema, .. } => schema,
-            LogicalPlan::CreateMemoryTable { input, .. } => input.schema(),
-            LogicalPlan::DropTable { schema, .. } => schema,
+            LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => {
+                input.schema()
+            }
+            LogicalPlan::DropTable(DropTable { schema, .. }) => schema,
         }
     }
 
@@ -342,13 +358,15 @@ impl LogicalPlan {
             LogicalPlan::Explain(ExplainPlan { schema, .. })
             | LogicalPlan::Analyze(AnalyzePlan { schema, .. })
             | LogicalPlan::EmptyRelation { schema, .. }
-            | LogicalPlan::CreateExternalTable { schema, .. } => vec![schema],
+            | LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => {
+                vec![schema]
+            }
             LogicalPlan::Limit { input, .. }
             | LogicalPlan::Repartition { input, .. }
             | LogicalPlan::Sort { input, .. }
-            | LogicalPlan::CreateMemoryTable { input, .. }
+            | LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
             | LogicalPlan::Filter { input, .. } => input.all_schemas(),
-            LogicalPlan::DropTable { .. } => vec![],
+            LogicalPlan::DropTable(_) => vec![],
         }
     }
 
@@ -393,9 +411,9 @@ impl LogicalPlan {
             LogicalPlan::TableScan { .. }
             | LogicalPlan::EmptyRelation { .. }
             | LogicalPlan::Limit { .. }
-            | LogicalPlan::CreateExternalTable { .. }
-            | LogicalPlan::CreateMemoryTable { .. }
-            | LogicalPlan::DropTable { .. }
+            | LogicalPlan::CreateExternalTable(_)
+            | LogicalPlan::CreateMemoryTable(_)
+            | LogicalPlan::DropTable(_)
             | LogicalPlan::CrossJoin { .. }
             | LogicalPlan::Analyze { .. }
             | LogicalPlan::Explain { .. }
@@ -422,13 +440,15 @@ impl LogicalPlan {
             LogicalPlan::Union { inputs, .. } => inputs.iter().collect(),
             LogicalPlan::Explain(explain) => vec![&explain.plan],
             LogicalPlan::Analyze(analyze) => vec![&analyze.input],
-            LogicalPlan::CreateMemoryTable { input, .. } => vec![input],
+            LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => {
+                vec![input]
+            }
             // plans without inputs
             LogicalPlan::TableScan { .. }
             | LogicalPlan::EmptyRelation { .. }
             | LogicalPlan::Values { .. }
-            | LogicalPlan::CreateExternalTable { .. }
-            | LogicalPlan::DropTable { .. } => vec![],
+            | LogicalPlan::CreateExternalTable(_)
+            | LogicalPlan::DropTable(_) => vec![],
         }
     }
 
@@ -561,7 +581,9 @@ impl LogicalPlan {
                 true
             }
             LogicalPlan::Limit { input, .. } => input.accept(visitor)?,
-            LogicalPlan::CreateMemoryTable { input, .. } => input.accept(visitor)?,
+            LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => {
+                input.accept(visitor)?
+            }
             LogicalPlan::Extension(extension) => {
                 for input in extension.node.inputs() {
                     if !input.accept(visitor)? {
@@ -576,8 +598,8 @@ impl LogicalPlan {
             LogicalPlan::TableScan { .. }
             | LogicalPlan::EmptyRelation { .. }
             | LogicalPlan::Values { .. }
-            | LogicalPlan::CreateExternalTable { .. }
-            | LogicalPlan::DropTable { .. } => true,
+            | LogicalPlan::CreateExternalTable(_)
+            | LogicalPlan::DropTable(_) => true,
         };
         if !recurse {
             return Ok(false);
@@ -889,15 +911,18 @@ impl LogicalPlan {
                         }
                     },
                     LogicalPlan::Limit { ref n, .. } => write!(f, "Limit: {}", n),
-                    LogicalPlan::CreateExternalTable { ref name, .. } => {
+                    LogicalPlan::CreateExternalTable(CreateExternalTable {
+                        ref name,
+                        ..
+                    }) => {
                         write!(f, "CreateExternalTable: {:?}", name)
                     }
-                    LogicalPlan::CreateMemoryTable { ref name, .. } => {
+                    LogicalPlan::CreateMemoryTable(CreateMemoryTable {
+                        name, ..
+                    }) => {
                         write!(f, "CreateMemoryTable: {:?}", name)
                     }
-                    LogicalPlan::DropTable {
-                        ref name, if_exist, ..
-                    } => {
+                    LogicalPlan::DropTable(DropTable { name, if_exist, .. }) => {
                         write!(f, "DropTable: {:?} if not exist:={}", name, if_exist)
                     }
                     LogicalPlan::Explain { .. } => write!(f, "Explain"),
diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs b/datafusion/src/optimizer/common_subexpr_eliminate.rs
index c88032e..9e7a9dd 100644
--- a/datafusion/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs
@@ -202,11 +202,11 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
         | LogicalPlan::Values { .. }
         | LogicalPlan::EmptyRelation { .. }
         | LogicalPlan::Limit { .. }
-        | LogicalPlan::CreateExternalTable { .. }
+        | LogicalPlan::CreateExternalTable(_)
         | LogicalPlan::Explain { .. }
         | LogicalPlan::Analyze { .. }
-        | LogicalPlan::CreateMemoryTable { .. }
-        | LogicalPlan::DropTable { .. }
+        | LogicalPlan::CreateMemoryTable(_)
+        | LogicalPlan::DropTable(_)
         | LogicalPlan::Extension { .. } => {
             // apply the optimization to all inputs of the plan
             let expr = plan.expressions();
diff --git a/datafusion/src/optimizer/constant_folding.rs b/datafusion/src/optimizer/constant_folding.rs
index a0bc04a..06bac6c 100644
--- a/datafusion/src/optimizer/constant_folding.rs
+++ b/datafusion/src/optimizer/constant_folding.rs
@@ -69,9 +69,9 @@ impl OptimizerRule for ConstantFolding {
             | LogicalPlan::Window { .. }
             | LogicalPlan::Aggregate { .. }
             | LogicalPlan::Repartition { .. }
-            | LogicalPlan::CreateExternalTable { .. }
-            | LogicalPlan::CreateMemoryTable { .. }
-            | LogicalPlan::DropTable { .. }
+            | LogicalPlan::CreateExternalTable(_)
+            | LogicalPlan::CreateMemoryTable(_)
+            | LogicalPlan::DropTable(_)
             | LogicalPlan::Values { .. }
             | LogicalPlan::Extension { .. }
             | LogicalPlan::Sort { .. }
diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs
index a2bf6f8..1bb1a18 100644
--- a/datafusion/src/optimizer/filter_push_down.rs
+++ b/datafusion/src/optimizer/filter_push_down.rs
@@ -16,8 +16,7 @@
 
 use crate::datasource::datasource::TableProviderFilterPushDown;
 use crate::execution::context::ExecutionProps;
-use crate::logical_plan::plan::TableScanPlan;
-use crate::logical_plan::{and, replace_col, Column, LogicalPlan};
+use crate::logical_plan::{and, replace_col, Column, LogicalPlan, TableScanPlan};
 use crate::logical_plan::{DFSchema, Expr};
 use crate::optimizer::optimizer::OptimizerRule;
 use crate::optimizer::utils;
diff --git a/datafusion/src/optimizer/limit_push_down.rs b/datafusion/src/optimizer/limit_push_down.rs
index ff8b8c1..842ca08 100644
--- a/datafusion/src/optimizer/limit_push_down.rs
+++ b/datafusion/src/optimizer/limit_push_down.rs
@@ -20,8 +20,8 @@
 use super::utils;
 use crate::error::Result;
 use crate::execution::context::ExecutionProps;
-use crate::logical_plan::plan::TableScanPlan;
 use crate::logical_plan::LogicalPlan;
+use crate::logical_plan::TableScanPlan;
 use crate::optimizer::optimizer::OptimizerRule;
 use std::sync::Arc;
 
diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs
index e1abb5c..f3a7c61 100644
--- a/datafusion/src/optimizer/projection_push_down.rs
+++ b/datafusion/src/optimizer/projection_push_down.rs
@@ -433,9 +433,9 @@ fn optimize_plan(
         | LogicalPlan::EmptyRelation { .. }
         | LogicalPlan::Values { .. }
         | LogicalPlan::Sort { .. }
-        | LogicalPlan::CreateExternalTable { .. }
-        | LogicalPlan::CreateMemoryTable { .. }
-        | LogicalPlan::DropTable { .. }
+        | LogicalPlan::CreateExternalTable(_)
+        | LogicalPlan::CreateMemoryTable(_)
+        | LogicalPlan::DropTable(_)
         | LogicalPlan::CrossJoin { .. }
         | LogicalPlan::Extension { .. } => {
             let expr = plan.expressions();
diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs
index 259c35a..fe14442 100644
--- a/datafusion/src/optimizer/utils.rs
+++ b/datafusion/src/optimizer/utils.rs
@@ -25,8 +25,9 @@ use super::optimizer::OptimizerRule;
 use crate::execution::context::{ExecutionContextState, ExecutionProps};
 use crate::logical_plan::plan::{AnalyzePlan, ExtensionPlan};
 use crate::logical_plan::{
-    build_join_schema, Column, DFSchema, DFSchemaRef, Expr, ExprRewriter, LogicalPlan,
-    LogicalPlanBuilder, Operator, Partitioning, Recursion, RewriteRecursion,
+    build_join_schema, Column, CreateMemoryTable, DFSchema, DFSchemaRef, Expr,
+    ExprRewriter, LogicalPlan, LogicalPlanBuilder, Operator, Partitioning, Recursion,
+    RewriteRecursion,
 };
 use crate::physical_plan::functions::Volatility;
 use crate::physical_plan::planner::DefaultPhysicalPlanner;
@@ -223,11 +224,11 @@ pub fn from_plan(
             n: *n,
             input: Arc::new(inputs[0].clone()),
         }),
-        LogicalPlan::CreateMemoryTable { name, .. } => {
-            Ok(LogicalPlan::CreateMemoryTable {
+        LogicalPlan::CreateMemoryTable(CreateMemoryTable { name, .. }) => {
+            Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
                 input: Arc::new(inputs[0].clone()),
                 name: name.clone(),
-            })
+            }))
         }
         LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(ExtensionPlan {
             node: e.node.from_template(expr, inputs),
@@ -262,8 +263,8 @@ pub fn from_plan(
         }
         LogicalPlan::EmptyRelation { .. }
         | LogicalPlan::TableScan { .. }
-        | LogicalPlan::CreateExternalTable { .. }
-        | LogicalPlan::DropTable { .. } => {
+        | LogicalPlan::CreateExternalTable(_)
+        | LogicalPlan::DropTable(_) => {
             // All of these plan types have no inputs / exprs so should not be called
             assert!(expr.is_empty(), "{:?} should have no exprs", plan);
             assert!(inputs.is_empty(), "{:?}  should have no inputs", plan);
diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs
index 5287e71..25ed0e4 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -23,7 +23,7 @@ use super::{
     hash_join::PartitionMode, udaf, union::UnionExec, values::ValuesExec, windows,
 };
 use crate::execution::context::ExecutionContextState;
-use crate::logical_plan::plan::TableScanPlan;
+use crate::logical_plan::TableScanPlan;
 use crate::logical_plan::{
     unnormalize_cols, DFSchema, Expr, LogicalPlan, Operator,
     Partitioning as LogicalPartitioning, PlanType, ToStringifiedPlan,
@@ -803,7 +803,7 @@ impl DefaultPhysicalPlanner {
 
                     Ok(Arc::new(GlobalLimitExec::new(input, limit)))
                 }
-                LogicalPlan::CreateExternalTable { .. } => {
+                LogicalPlan::CreateExternalTable(_) => {
                     // There is no default plan for "CREATE EXTERNAL
                     // TABLE" -- it must be handled at a higher level (so
                     // that the appropriate table can be registered with
@@ -812,7 +812,7 @@ impl DefaultPhysicalPlanner {
                         "Unsupported logical plan: CreateExternalTable".to_string(),
                     ))
                 }
-                | LogicalPlan::CreateMemoryTable {..} | LogicalPlan::DropTable {..} => {
+                | LogicalPlan::CreateMemoryTable(_) | LogicalPlan::DropTable (_) => {
                     // Create a dummy exec.
                     Ok(Arc::new(EmptyExec::new(
                         false,
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index 87c3749..575acd0 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -29,9 +29,11 @@ use crate::logical_plan::window_frames::{WindowFrame, WindowFrameUnits};
 use crate::logical_plan::Expr::Alias;
 use crate::logical_plan::{
     and, builder::expand_wildcard, col, lit, normalize_col, union_with_alias, Column,
-    DFSchema, DFSchemaRef, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType,
+    CreateExternalTable as PlanCreateExternalTable, CreateMemoryTable, DFSchema,
+    DFSchemaRef, DropTable, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType,
     ToDFSchema, ToStringifiedPlan,
 };
+
 use crate::optimizer::utils::exprlist_to_columns;
 use crate::prelude::JoinType;
 use crate::scalar::ScalarValue;
@@ -158,10 +160,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             {
                 let plan = self.query_to_plan(query)?;
 
-                Ok(LogicalPlan::CreateMemoryTable {
+                Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
                     name: name.to_string(),
                     input: Arc::new(plan),
-                })
+                }))
             }
             Statement::CreateTable { .. } => Err(DataFusionError::NotImplemented(
                 "Only `CREATE TABLE table_name AS SELECT ...` statement is supported"
@@ -177,11 +179,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             } =>
             // We don't support cascade and purge for now.
             {
-                Ok(LogicalPlan::DropTable {
+                Ok(LogicalPlan::DropTable(DropTable {
                     name: names.get(0).unwrap().to_string(),
                     if_exist: *if_exists,
                     schema: DFSchemaRef::new(DFSchema::empty()),
-                })
+                }))
             }
 
             Statement::ShowColumns {
@@ -306,13 +308,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
 
         let schema = self.build_schema(columns)?;
 
-        Ok(LogicalPlan::CreateExternalTable {
+        Ok(LogicalPlan::CreateExternalTable(PlanCreateExternalTable {
             schema: schema.to_dfschema_ref()?,
             name: name.clone(),
             location: location.clone(),
             file_type: *file_type,
             has_header: *has_header,
-        })
+        }))
     }
 
     /// Generate a plan for EXPLAIN ... that will print out a plan
diff --git a/datafusion/tests/custom_sources.rs b/datafusion/tests/custom_sources.rs
index de44b82..25040a6 100644
--- a/datafusion/tests/custom_sources.rs
+++ b/datafusion/tests/custom_sources.rs
@@ -31,7 +31,7 @@ use datafusion::{
 
 use datafusion::execution::context::ExecutionContext;
 use datafusion::logical_plan::{
-    col, plan::TableScanPlan, Expr, LogicalPlan, LogicalPlanBuilder, UNNAMED_TABLE,
+    col, Expr, LogicalPlan, LogicalPlanBuilder, TableScanPlan, UNNAMED_TABLE,
 };
 use datafusion::physical_plan::{
     ColumnStatistics, ExecutionPlan, Partitioning, RecordBatchStream,
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index b06b170..63d4d69 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -36,8 +36,8 @@ use datafusion::assert_batches_eq;
 use datafusion::assert_batches_sorted_eq;
 use datafusion::assert_contains;
 use datafusion::assert_not_contains;
-use datafusion::logical_plan::plan::TableScanPlan;
 use datafusion::logical_plan::LogicalPlan;
+use datafusion::logical_plan::TableScanPlan;
 use datafusion::physical_plan::functions::Volatility;
 use datafusion::physical_plan::metrics::MetricValue;
 use datafusion::physical_plan::ExecutionPlan;