You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/11/22 12:03:51 UTC

[arrow-datafusion] branch master updated: Extract Projection, Filter, Window in LogicalPlan as independent struct (#1309)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new a607861  Extract Projection, Filter, Window in LogicalPlan as independent struct (#1309)
a607861 is described below

commit a607861e53de28400adc58ea82f500fc87b7fdad
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Mon Nov 22 20:03:25 2021 +0800

    Extract Projection, Filter, Window in LogicalPlan as independent struct (#1309)
    
    * Extract Projection, Filter, Window in LogicalPlan as independent struct (#1302)
    
    * Extract Projection, Filter, Window in LogicalPlan as independent struct (#1302)
    
    * solve conflicts
    
    * solve conflicts
    
    * solve conflicts
    
    Co-authored-by: liuli <li...@analysys.com.cn>
---
 .../rust/core/src/serde/logical_plan/to_proto.rs   |  12 +--
 datafusion/src/execution/context.rs                |   5 +-
 datafusion/src/logical_plan/builder.rs             |  15 +--
 datafusion/src/logical_plan/plan.rs                | 110 ++++++++++++---------
 .../src/optimizer/common_subexpr_eliminate.rs      |  27 ++---
 datafusion/src/optimizer/filter_push_down.rs       |  11 ++-
 datafusion/src/optimizer/limit_push_down.rs        |   9 +-
 datafusion/src/optimizer/projection_push_down.rs   |  18 ++--
 .../src/optimizer/single_distinct_to_groupby.rs    |   5 +-
 datafusion/src/optimizer/utils.rs                  |  26 ++---
 datafusion/src/physical_plan/planner.rs            |  12 +--
 datafusion/tests/custom_sources.rs                 |   3 +-
 datafusion/tests/sql.rs                            |   3 +-
 13 files changed, 143 insertions(+), 113 deletions(-)

diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index 9c13810..0e0b989 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -30,7 +30,7 @@ use datafusion::datasource::TableProvider;
 
 use datafusion::datasource::file_format::parquet::ParquetFormat;
 use datafusion::datasource::listing::ListingTable;
-use datafusion::logical_plan::plan::EmptyRelation;
+use datafusion::logical_plan::plan::{EmptyRelation, Filter, Projection, Window};
 use datafusion::logical_plan::{
     exprlist_to_fields,
     window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
@@ -779,9 +779,9 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
                     )))
                 }
             }
-            LogicalPlan::Projection {
+            LogicalPlan::Projection(Projection {
                 expr, input, alias, ..
-            } => Ok(protobuf::LogicalPlanNode {
+            }) => Ok(protobuf::LogicalPlanNode {
                 logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
                     protobuf::ProjectionNode {
                         input: Some(Box::new(input.as_ref().try_into()?)),
@@ -796,7 +796,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
                     },
                 ))),
             }),
-            LogicalPlan::Filter { predicate, input } => {
+            LogicalPlan::Filter(Filter { predicate, input }) => {
                 let input: protobuf::LogicalPlanNode = input.as_ref().try_into()?;
                 Ok(protobuf::LogicalPlanNode {
                     logical_plan_type: Some(LogicalPlanType::Selection(Box::new(
@@ -807,9 +807,9 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
                     ))),
                 })
             }
-            LogicalPlan::Window {
+            LogicalPlan::Window(Window {
                 input, window_expr, ..
-            } => {
+            }) => {
                 let input: protobuf::LogicalPlanNode = input.as_ref().try_into()?;
                 Ok(protobuf::LogicalPlanNode {
                     logical_plan_type: Some(LogicalPlanType::Window(Box::new(
diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index 5f77b2b..9e5ac1f 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -1177,6 +1177,7 @@ impl FunctionRegistry for ExecutionContextState {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::logical_plan::plan::Projection;
     use crate::logical_plan::TableScanPlan;
     use crate::logical_plan::{binary_expr, lit, Operator};
     use crate::physical_plan::functions::{make_scalar_function, Volatility};
@@ -1415,7 +1416,7 @@ mod tests {
 
         let optimized_plan = ctx.optimize(&logical_plan)?;
         match &optimized_plan {
-            LogicalPlan::Projection { input, .. } => match &**input {
+            LogicalPlan::Projection(Projection { input, .. }) => match &**input {
                 LogicalPlan::TableScan(TableScanPlan {
                     source,
                     projected_schema,
@@ -1488,7 +1489,7 @@ mod tests {
         let ctx = ExecutionContext::new();
         let optimized_plan = ctx.optimize(&plan)?;
         match &optimized_plan {
-            LogicalPlan::Projection { input, .. } => match &**input {
+            LogicalPlan::Projection(Projection { input, .. }) => match &**input {
                 LogicalPlan::TableScan(TableScanPlan {
                     source,
                     projected_schema,
diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs
index 94e53d6..2eb0091 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -26,7 +26,8 @@ use crate::datasource::{
 };
 use crate::error::{DataFusionError, Result};
 use crate::logical_plan::plan::{
-    AnalyzePlan, EmptyRelation, ExplainPlan, TableScanPlan, ToStringifiedPlan, Union,
+    AnalyzePlan, EmptyRelation, ExplainPlan, Filter, Projection, TableScanPlan,
+    ToStringifiedPlan, Union, Window,
 };
 use crate::prelude::*;
 use crate::scalar::ScalarValue;
@@ -451,10 +452,10 @@ impl LogicalPlanBuilder {
     /// Apply a filter
     pub fn filter(&self, expr: impl Into<Expr>) -> Result<Self> {
         let expr = normalize_col(expr.into(), &self.plan)?;
-        Ok(Self::from(LogicalPlan::Filter {
+        Ok(Self::from(LogicalPlan::Filter(Filter {
             predicate: expr,
             input: Arc::new(self.plan.clone()),
-        }))
+        })))
     }
 
     /// Apply a limit
@@ -658,11 +659,11 @@ impl LogicalPlanBuilder {
         let mut window_fields: Vec<DFField> =
             exprlist_to_fields(all_expr, self.plan.schema())?;
         window_fields.extend_from_slice(self.plan.schema().fields());
-        Ok(Self::from(LogicalPlan::Window {
+        Ok(Self::from(LogicalPlan::Window(Window {
             input: Arc::new(self.plan.clone()),
             window_expr,
             schema: Arc::new(DFSchema::new(window_fields)?),
-        }))
+        })))
     }
 
     /// Apply an aggregate: grouping on the `group_expr` expressions
@@ -898,12 +899,12 @@ pub fn project_with_alias(
         Some(ref alias) => input_schema.replace_qualifier(alias.as_str()),
         None => input_schema,
     };
-    Ok(LogicalPlan::Projection {
+    Ok(LogicalPlan::Projection(Projection {
         expr: projected_expr,
         input: Arc::new(plan.clone()),
         schema: DFSchemaRef::new(schema),
         alias,
-    })
+    }))
 }
 
 /// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s.
diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs
index 31de255..a7aac90 100644
--- a/datafusion/src/logical_plan/plan.rs
+++ b/datafusion/src/logical_plan/plan.rs
@@ -57,6 +57,47 @@ pub enum JoinConstraint {
     Using,
 }
 
+/// Evaluates an arbitrary list of expressions (essentially a
+/// SELECT with an expression list) on its input.
+#[derive(Clone)]
+pub struct Projection {
+    /// The list of expressions
+    pub expr: Vec<Expr>,
+    /// The incoming logical plan
+    pub input: Arc<LogicalPlan>,
+    /// The schema description of the output
+    pub schema: DFSchemaRef,
+    /// Projection output relation alias
+    pub alias: Option<String>,
+}
+
+/// Filters rows from its input that do not match an
+/// expression (essentially a WHERE clause with a predicate
+/// expression).
+///
+/// Semantically, `<predicate>` is evaluated for each row of the input;
+/// If the value of `<predicate>` is true, the input row is passed to
+/// the output. If the value of `<predicate>` is false, the row is
+/// discarded.
+#[derive(Clone)]
+pub struct Filter {
+    /// The predicate expression, which must have Boolean type.
+    pub predicate: Expr,
+    /// The incoming logical plan
+    pub input: Arc<LogicalPlan>,
+}
+
+/// Window its input based on a set of window spec and window function (e.g. SUM or RANK)
+#[derive(Clone)]
+pub struct Window {
+    /// The incoming logical plan
+    pub input: Arc<LogicalPlan>,
+    /// The window function expression
+    pub window_expr: Vec<Expr>,
+    /// The schema description of the window output
+    pub schema: DFSchemaRef,
+}
+
 /// Produces rows from a table provider by reference or from the context
 #[derive(Clone)]
 pub struct TableScanPlan {
@@ -214,16 +255,7 @@ pub struct Values {
 pub enum LogicalPlan {
     /// Evaluates an arbitrary list of expressions (essentially a
     /// SELECT with an expression list) on its input.
-    Projection {
-        /// The list of expressions
-        expr: Vec<Expr>,
-        /// The incoming logical plan
-        input: Arc<LogicalPlan>,
-        /// The schema description of the output
-        schema: DFSchemaRef,
-        /// Projection output relation alias
-        alias: Option<String>,
-    },
+    Projection(Projection),
     /// Filters rows from its input that do not match an
     /// expression (essentially a WHERE clause with a predicate
     /// expression).
@@ -232,21 +264,9 @@ pub enum LogicalPlan {
     /// If the value of `<predicate>` is true, the input row is passed to
     /// the output. If the value of `<predicate>` is false, the row is
     /// discarded.
-    Filter {
-        /// The predicate expression, which must have Boolean type.
-        predicate: Expr,
-        /// The incoming logical plan
-        input: Arc<LogicalPlan>,
-    },
+    Filter(Filter),
     /// Window its input based on a set of window spec and window function (e.g. SUM or RANK)
-    Window {
-        /// The incoming logical plan
-        input: Arc<LogicalPlan>,
-        /// The window function expression
-        window_expr: Vec<Expr>,
-        /// The schema description of the window output
-        schema: DFSchemaRef,
-    },
+    Window(Window),
     /// Aggregates its input based on a set of grouping and aggregate
     /// expressions (e.g. SUM).
     Aggregate {
@@ -324,9 +344,9 @@ impl LogicalPlan {
             LogicalPlan::TableScan(TableScanPlan {
                 projected_schema, ..
             }) => projected_schema,
-            LogicalPlan::Projection { schema, .. } => schema,
-            LogicalPlan::Filter { input, .. } => input.schema(),
-            LogicalPlan::Window { schema, .. } => schema,
+            LogicalPlan::Projection(Projection { schema, .. }) => schema,
+            LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
+            LogicalPlan::Window(Window { schema, .. }) => schema,
             LogicalPlan::Aggregate { schema, .. } => schema,
             LogicalPlan::Sort { input, .. } => input.schema(),
             LogicalPlan::Join { schema, .. } => schema,
@@ -354,9 +374,9 @@ impl LogicalPlan {
                 projected_schema, ..
             }) => vec![projected_schema],
             LogicalPlan::Values(Values { schema, .. }) => vec![schema],
-            LogicalPlan::Window { input, schema, .. }
+            LogicalPlan::Window(Window { input, schema, .. })
             | LogicalPlan::Aggregate { input, schema, .. }
-            | LogicalPlan::Projection { input, schema, .. } => {
+            | LogicalPlan::Projection(Projection { input, schema, .. }) => {
                 let mut schemas = input.all_schemas();
                 schemas.insert(0, schema);
                 schemas
@@ -391,7 +411,7 @@ impl LogicalPlan {
             | LogicalPlan::Repartition(Repartition { input, .. })
             | LogicalPlan::Sort { input, .. }
             | LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
-            | LogicalPlan::Filter { input, .. } => input.all_schemas(),
+            | LogicalPlan::Filter(Filter { input, .. }) => input.all_schemas(),
             LogicalPlan::DropTable(_) => vec![],
         }
     }
@@ -409,11 +429,11 @@ impl LogicalPlan {
     /// children
     pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
         match self {
-            LogicalPlan::Projection { expr, .. } => expr.clone(),
+            LogicalPlan::Projection(Projection { expr, .. }) => expr.clone(),
             LogicalPlan::Values(Values { values, .. }) => {
                 values.iter().flatten().cloned().collect()
             }
-            LogicalPlan::Filter { predicate, .. } => vec![predicate.clone()],
+            LogicalPlan::Filter(Filter { predicate, .. }) => vec![predicate.clone()],
             LogicalPlan::Repartition(Repartition {
                 partitioning_scheme,
                 ..
@@ -421,7 +441,7 @@ impl LogicalPlan {
                 Partitioning::Hash(expr, _) => expr.clone(),
                 _ => vec![],
             },
-            LogicalPlan::Window { window_expr, .. } => window_expr.clone(),
+            LogicalPlan::Window(Window { window_expr, .. }) => window_expr.clone(),
             LogicalPlan::Aggregate {
                 group_expr,
                 aggr_expr,
@@ -453,10 +473,10 @@ impl LogicalPlan {
     /// include inputs to inputs.
     pub fn inputs(self: &LogicalPlan) -> Vec<&LogicalPlan> {
         match self {
-            LogicalPlan::Projection { input, .. } => vec![input],
-            LogicalPlan::Filter { input, .. } => vec![input],
+            LogicalPlan::Projection(Projection { input, .. }) => vec![input],
+            LogicalPlan::Filter(Filter { input, .. }) => vec![input],
             LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
-            LogicalPlan::Window { input, .. } => vec![input],
+            LogicalPlan::Window(Window { input, .. }) => vec![input],
             LogicalPlan::Aggregate { input, .. } => vec![input],
             LogicalPlan::Sort { input, .. } => vec![input],
             LogicalPlan::Join { left, right, .. } => vec![left, right],
@@ -588,12 +608,12 @@ impl LogicalPlan {
         }
 
         let recurse = match self {
-            LogicalPlan::Projection { input, .. } => input.accept(visitor)?,
-            LogicalPlan::Filter { input, .. } => input.accept(visitor)?,
+            LogicalPlan::Projection(Projection { input, .. }) => input.accept(visitor)?,
+            LogicalPlan::Filter(Filter { input, .. }) => input.accept(visitor)?,
             LogicalPlan::Repartition(Repartition { input, .. }) => {
                 input.accept(visitor)?
             }
-            LogicalPlan::Window { input, .. } => input.accept(visitor)?,
+            LogicalPlan::Window(Window { input, .. }) => input.accept(visitor)?,
             LogicalPlan::Aggregate { input, .. } => input.accept(visitor)?,
             LogicalPlan::Sort { input, .. } => input.accept(visitor)?,
             LogicalPlan::Join { left, right, .. }
@@ -856,9 +876,9 @@ impl LogicalPlan {
 
                         Ok(())
                     }
-                    LogicalPlan::Projection {
+                    LogicalPlan::Projection(Projection {
                         ref expr, alias, ..
-                    } => {
+                    }) => {
                         write!(f, "Projection: ")?;
                         for (i, expr_item) in expr.iter().enumerate() {
                             if i > 0 {
@@ -871,13 +891,13 @@ impl LogicalPlan {
                         }
                         Ok(())
                     }
-                    LogicalPlan::Filter {
+                    LogicalPlan::Filter(Filter {
                         predicate: ref expr,
                         ..
-                    } => write!(f, "Filter: {:?}", expr),
-                    LogicalPlan::Window {
+                    }) => write!(f, "Filter: {:?}", expr),
+                    LogicalPlan::Window(Window {
                         ref window_expr, ..
-                    } => {
+                    }) => {
                         write!(f, "WindowAggr: windowExpr=[{:?}]", window_expr)
                     }
                     LogicalPlan::Aggregate {
diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs b/datafusion/src/optimizer/common_subexpr_eliminate.rs
index 930f0b8..ced2c0c 100644
--- a/datafusion/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs
@@ -19,6 +19,7 @@
 
 use crate::error::Result;
 use crate::execution::context::ExecutionProps;
+use crate::logical_plan::plan::{Filter, Projection, Window};
 use crate::logical_plan::{
     col, DFField, DFSchema, Expr, ExprRewriter, ExpressionVisitor, LogicalPlan,
     Recursion, RewriteRecursion,
@@ -77,12 +78,12 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
     let mut expr_set = ExprSet::new();
 
     match plan {
-        LogicalPlan::Projection {
+        LogicalPlan::Projection(Projection {
             expr,
             input,
             schema,
             alias,
-        } => {
+        }) => {
             let arrays = to_arrays(expr, input, &mut expr_set)?;
 
             let (mut new_expr, new_input) = rewrite_expr(
@@ -94,14 +95,14 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
                 execution_props,
             )?;
 
-            Ok(LogicalPlan::Projection {
+            Ok(LogicalPlan::Projection(Projection {
                 expr: new_expr.pop().unwrap(),
                 input: Arc::new(new_input),
                 schema: schema.clone(),
                 alias: alias.clone(),
-            })
+            }))
         }
-        LogicalPlan::Filter { predicate, input } => {
+        LogicalPlan::Filter(Filter { predicate, input }) => {
             let schemas = plan.all_schemas();
             let all_schema =
                 schemas.into_iter().fold(DFSchema::empty(), |mut lhs, rhs| {
@@ -122,16 +123,16 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
                 execution_props,
             )?;
 
-            Ok(LogicalPlan::Filter {
+            Ok(LogicalPlan::Filter(Filter {
                 predicate: new_expr.pop().unwrap().pop().unwrap(),
                 input: Arc::new(new_input),
-            })
+            }))
         }
-        LogicalPlan::Window {
+        LogicalPlan::Window(Window {
             input,
             window_expr,
             schema,
-        } => {
+        }) => {
             let arrays = to_arrays(window_expr, input, &mut expr_set)?;
 
             let (mut new_expr, new_input) = rewrite_expr(
@@ -143,11 +144,11 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
                 execution_props,
             )?;
 
-            Ok(LogicalPlan::Window {
+            Ok(LogicalPlan::Window(Window {
                 input: Arc::new(new_input),
                 window_expr: new_expr.pop().unwrap(),
                 schema: schema.clone(),
-            })
+            }))
         }
         LogicalPlan::Aggregate {
             input,
@@ -265,12 +266,12 @@ fn build_project_plan(
     let mut schema = DFSchema::new(fields)?;
     schema.merge(input.schema());
 
-    Ok(LogicalPlan::Projection {
+    Ok(LogicalPlan::Projection(Projection {
         expr: project_exprs,
         input: Arc::new(input),
         schema: Arc::new(schema),
         alias: None,
-    })
+    }))
 }
 
 #[inline]
diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs
index 7e5c2c1..4214a57 100644
--- a/datafusion/src/optimizer/filter_push_down.rs
+++ b/datafusion/src/optimizer/filter_push_down.rs
@@ -16,6 +16,7 @@
 
 use crate::datasource::datasource::TableProviderFilterPushDown;
 use crate::execution::context::ExecutionProps;
+use crate::logical_plan::plan::{Filter, Projection};
 use crate::logical_plan::{
     and, replace_col, Column, CrossJoin, Limit, LogicalPlan, TableScanPlan,
 };
@@ -181,10 +182,10 @@ fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> LogicalPlan {
             and(acc, (*predicate).to_owned())
         });
 
-    LogicalPlan::Filter {
+    LogicalPlan::Filter(Filter {
         predicate,
         input: Arc::new(plan),
-    }
+    })
 }
 
 // remove all filters from `filters` that are in `predicate_columns`
@@ -287,7 +288,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
             push_down(&state, plan)
         }
         LogicalPlan::Analyze { .. } => push_down(&state, plan),
-        LogicalPlan::Filter { input, predicate } => {
+        LogicalPlan::Filter(Filter { input, predicate }) => {
             let mut predicates = vec![];
             split_members(predicate, &mut predicates);
 
@@ -316,12 +317,12 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
                 optimize(input, state)
             }
         }
-        LogicalPlan::Projection {
+        LogicalPlan::Projection(Projection {
             input,
             expr,
             schema,
             alias: _,
-        } => {
+        }) => {
             // A projection is filter-commutable, but re-writes all predicate expressions
             // collect projection.
             let projection = schema
diff --git a/datafusion/src/optimizer/limit_push_down.rs b/datafusion/src/optimizer/limit_push_down.rs
index 320e1c7..e2c65de 100644
--- a/datafusion/src/optimizer/limit_push_down.rs
+++ b/datafusion/src/optimizer/limit_push_down.rs
@@ -20,6 +20,7 @@
 use super::utils;
 use crate::error::Result;
 use crate::execution::context::ExecutionProps;
+use crate::logical_plan::plan::Projection;
 use crate::logical_plan::{Limit, TableScanPlan};
 use crate::logical_plan::{LogicalPlan, Union};
 use crate::optimizer::optimizer::OptimizerRule;
@@ -77,16 +78,16 @@ fn limit_push_down(
             projected_schema: projected_schema.clone(),
         })),
         (
-            LogicalPlan::Projection {
+            LogicalPlan::Projection(Projection {
                 expr,
                 input,
                 schema,
                 alias,
-            },
+            }),
             upper_limit,
         ) => {
             // Push down limit directly (projection doesn't change number of rows)
-            Ok(LogicalPlan::Projection {
+            Ok(LogicalPlan::Projection(Projection {
                 expr: expr.clone(),
                 input: Arc::new(limit_push_down(
                     optimizer,
@@ -96,7 +97,7 @@ fn limit_push_down(
                 )?),
                 schema: schema.clone(),
                 alias: alias.clone(),
-            })
+            }))
         }
         (
             LogicalPlan::Union(Union {
diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs
index b6f0ff5..5b1f3ea 100644
--- a/datafusion/src/optimizer/projection_push_down.rs
+++ b/datafusion/src/optimizer/projection_push_down.rs
@@ -20,7 +20,7 @@
 
 use crate::error::{DataFusionError, Result};
 use crate::execution::context::ExecutionProps;
-use crate::logical_plan::plan::{AnalyzePlan, TableScanPlan};
+use crate::logical_plan::plan::{AnalyzePlan, Projection, TableScanPlan, Window};
 use crate::logical_plan::{
     build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan,
     LogicalPlanBuilder, ToDFSchema, Union,
@@ -131,12 +131,12 @@ fn optimize_plan(
 ) -> Result<LogicalPlan> {
     let mut new_required_columns = required_columns.clone();
     match plan {
-        LogicalPlan::Projection {
+        LogicalPlan::Projection(Projection {
             input,
             expr,
             schema,
             alias,
-        } => {
+        }) => {
             // projection:
             // * remove any expression that is not required
             // * construct the new set of required columns
@@ -182,12 +182,12 @@ fn optimize_plan(
                 // no need for an expression at all
                 Ok(new_input)
             } else {
-                Ok(LogicalPlan::Projection {
+                Ok(LogicalPlan::Projection(Projection {
                     expr: new_expr,
                     input: Arc::new(new_input),
                     schema: DFSchemaRef::new(DFSchema::new(new_fields)?),
                     alias: alias.clone(),
-                })
+                }))
             }
         }
         LogicalPlan::Join {
@@ -236,12 +236,12 @@ fn optimize_plan(
                 null_equals_null: *null_equals_null,
             })
         }
-        LogicalPlan::Window {
+        LogicalPlan::Window(Window {
             schema,
             window_expr,
             input,
             ..
-        } => {
+        }) => {
             // Gather all columns needed for expressions in this Window
             let mut new_window_expr = Vec::new();
             {
@@ -745,12 +745,12 @@ mod tests {
         let expr = vec![col("a"), col("b")];
         let projected_fields = exprlist_to_fields(&expr, input_schema).unwrap();
         let projected_schema = DFSchema::new(projected_fields).unwrap();
-        let plan = LogicalPlan::Projection {
+        let plan = LogicalPlan::Projection(Projection {
             expr,
             input: Arc::new(table_scan),
             schema: Arc::new(projected_schema),
             alias: None,
-        };
+        });
 
         assert_fields_eq(&plan, vec!["a", "b"]);
 
diff --git a/datafusion/src/optimizer/single_distinct_to_groupby.rs b/datafusion/src/optimizer/single_distinct_to_groupby.rs
index f6178a2..358444d 100644
--- a/datafusion/src/optimizer/single_distinct_to_groupby.rs
+++ b/datafusion/src/optimizer/single_distinct_to_groupby.rs
@@ -19,6 +19,7 @@
 
 use crate::error::Result;
 use crate::execution::context::ExecutionProps;
+use crate::logical_plan::plan::Projection;
 use crate::logical_plan::{columnize_expr, DFSchema, Expr, LogicalPlan};
 use crate::optimizer::optimizer::OptimizerRule;
 use crate::optimizer::utils;
@@ -124,12 +125,12 @@ fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
                         ));
                     });
 
-                Ok(LogicalPlan::Projection {
+                Ok(LogicalPlan::Projection(Projection {
                     expr: alias_expr,
                     input: Arc::new(final_agg),
                     schema: schema.clone(),
                     alias: Option::None,
-                })
+                }))
             } else {
                 optimize_children(plan)
             }
diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs
index 05c3629..aa559cd 100644
--- a/datafusion/src/optimizer/utils.rs
+++ b/datafusion/src/optimizer/utils.rs
@@ -23,7 +23,7 @@ use arrow::record_batch::RecordBatch;
 
 use super::optimizer::OptimizerRule;
 use crate::execution::context::{ExecutionContextState, ExecutionProps};
-use crate::logical_plan::plan::{AnalyzePlan, ExtensionPlan};
+use crate::logical_plan::plan::{AnalyzePlan, ExtensionPlan, Filter, Projection, Window};
 use crate::logical_plan::{
     build_join_schema, Column, CreateMemoryTable, DFSchema, DFSchemaRef, Expr,
     ExprRewriter, Limit, LogicalPlan, LogicalPlanBuilder, Operator, Partitioning,
@@ -145,12 +145,14 @@ pub fn from_plan(
     inputs: &[LogicalPlan],
 ) -> Result<LogicalPlan> {
     match plan {
-        LogicalPlan::Projection { schema, alias, .. } => Ok(LogicalPlan::Projection {
-            expr: expr.to_vec(),
-            input: Arc::new(inputs[0].clone()),
-            schema: schema.clone(),
-            alias: alias.clone(),
-        }),
+        LogicalPlan::Projection(Projection { schema, alias, .. }) => {
+            Ok(LogicalPlan::Projection(Projection {
+                expr: expr.to_vec(),
+                input: Arc::new(inputs[0].clone()),
+                schema: schema.clone(),
+                alias: alias.clone(),
+            }))
+        }
         LogicalPlan::Values(Values { schema, .. }) => Ok(LogicalPlan::Values(Values {
             schema: schema.clone(),
             values: expr
@@ -158,10 +160,10 @@ pub fn from_plan(
                 .map(|s| s.to_vec())
                 .collect::<Vec<_>>(),
         })),
-        LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter {
+        LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter(Filter {
             predicate: expr[0].clone(),
             input: Arc::new(inputs[0].clone()),
-        }),
+        })),
         LogicalPlan::Repartition(Repartition {
             partitioning_scheme,
             ..
@@ -177,15 +179,15 @@ pub fn from_plan(
                 input: Arc::new(inputs[0].clone()),
             })),
         },
-        LogicalPlan::Window {
+        LogicalPlan::Window(Window {
             window_expr,
             schema,
             ..
-        } => Ok(LogicalPlan::Window {
+        }) => Ok(LogicalPlan::Window(Window {
             input: Arc::new(inputs[0].clone()),
             window_expr: expr[0..window_expr.len()].to_vec(),
             schema: schema.clone(),
-        }),
+        })),
         LogicalPlan::Aggregate {
             group_expr, schema, ..
         } => Ok(LogicalPlan::Aggregate {
diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs
index 8f7112e..7fcacd2 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -23,7 +23,7 @@ use super::{
     hash_join::PartitionMode, udaf, union::UnionExec, values::ValuesExec, windows,
 };
 use crate::execution::context::ExecutionContextState;
-use crate::logical_plan::plan::EmptyRelation;
+use crate::logical_plan::plan::{EmptyRelation, Filter, Projection, Window};
 use crate::logical_plan::{
     unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan, Operator,
     Partitioning as LogicalPartitioning, PlanType, Repartition, ToStringifiedPlan, Union,
@@ -365,9 +365,9 @@ impl DefaultPhysicalPlanner {
                     )?;
                     Ok(Arc::new(value_exec))
                 }
-                LogicalPlan::Window {
+                LogicalPlan::Window(Window {
                     input, window_expr, ..
-                } => {
+                }) => {
                     if window_expr.is_empty() {
                         return Err(DataFusionError::Internal(
                             "Impossibly got empty window expression".to_owned(),
@@ -571,7 +571,7 @@ impl DefaultPhysicalPlanner {
                         physical_input_schema.clone(),
                     )?) )
                 }
-                LogicalPlan::Projection { input, expr, .. } => {
+                LogicalPlan::Projection(Projection { input, expr, .. }) => {
                     let input_exec = self.create_initial_plan(input, ctx_state).await?;
                     let input_schema = input.as_ref().schema();
 
@@ -623,9 +623,9 @@ impl DefaultPhysicalPlanner {
                         input_exec,
                     )?) )
                 }
-                LogicalPlan::Filter {
+                LogicalPlan::Filter(Filter {
                     input, predicate, ..
-                } => {
+                }) => {
                     let physical_input = self.create_initial_plan(input, ctx_state).await?;
                     let input_schema = physical_input.as_ref().schema();
                     let input_dfschema = input.as_ref().schema();
diff --git a/datafusion/tests/custom_sources.rs b/datafusion/tests/custom_sources.rs
index 25040a6..a145ca3 100644
--- a/datafusion/tests/custom_sources.rs
+++ b/datafusion/tests/custom_sources.rs
@@ -45,6 +45,7 @@ use std::sync::Arc;
 use std::task::{Context, Poll};
 
 use async_trait::async_trait;
+use datafusion::logical_plan::plan::Projection;
 
 //// Custom source dataframe tests ////
 
@@ -216,7 +217,7 @@ async fn custom_source_dataframe() -> Result<()> {
 
     let optimized_plan = ctx.optimize(&logical_plan)?;
     match &optimized_plan {
-        LogicalPlan::Projection { input, .. } => match &**input {
+        LogicalPlan::Projection(Projection { input, .. }) => match &**input {
             LogicalPlan::TableScan(TableScanPlan {
                 source,
                 projected_schema,
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index e761633..24b2a49 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -36,6 +36,7 @@ use datafusion::assert_batches_eq;
 use datafusion::assert_batches_sorted_eq;
 use datafusion::assert_contains;
 use datafusion::assert_not_contains;
+use datafusion::logical_plan::plan::Projection;
 use datafusion::logical_plan::LogicalPlan;
 use datafusion::logical_plan::TableScanPlan;
 use datafusion::physical_plan::functions::Volatility;
@@ -90,7 +91,7 @@ async fn nyc() -> Result<()> {
     let optimized_plan = ctx.optimize(&logical_plan)?;
 
     match &optimized_plan {
-        LogicalPlan::Projection { input, .. } => match input.as_ref() {
+        LogicalPlan::Projection(Projection { input, .. }) => match input.as_ref() {
             LogicalPlan::Aggregate { input, .. } => match input.as_ref() {
                 LogicalPlan::TableScan(TableScanPlan {
                     ref projected_schema,