You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/11/22 12:03:51 UTC
[arrow-datafusion] branch master updated: Extract Projection, Filter, Window in LogicalPlan as independent struct (#1309)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new a607861 Extract Projection, Filter, Window in LogicalPlan as independent struct (#1309)
a607861 is described below
commit a607861e53de28400adc58ea82f500fc87b7fdad
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Mon Nov 22 20:03:25 2021 +0800
Extract Projection, Filter, Window in LogicalPlan as independent struct (#1309)
* Extract Projection, Filter, Window in LogicalPlan as independent struct (#1302)
* Extract Projection, Filter, Window in LogicalPlan as independent struct (#1302)
* solve conflicts
* solve conflicts
* solve conflicts
Co-authored-by: liuli <li...@analysys.com.cn>
---
.../rust/core/src/serde/logical_plan/to_proto.rs | 12 +--
datafusion/src/execution/context.rs | 5 +-
datafusion/src/logical_plan/builder.rs | 15 +--
datafusion/src/logical_plan/plan.rs | 110 ++++++++++++---------
.../src/optimizer/common_subexpr_eliminate.rs | 27 ++---
datafusion/src/optimizer/filter_push_down.rs | 11 ++-
datafusion/src/optimizer/limit_push_down.rs | 9 +-
datafusion/src/optimizer/projection_push_down.rs | 18 ++--
.../src/optimizer/single_distinct_to_groupby.rs | 5 +-
datafusion/src/optimizer/utils.rs | 26 ++---
datafusion/src/physical_plan/planner.rs | 12 +--
datafusion/tests/custom_sources.rs | 3 +-
datafusion/tests/sql.rs | 3 +-
13 files changed, 143 insertions(+), 113 deletions(-)
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index 9c13810..0e0b989 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -30,7 +30,7 @@ use datafusion::datasource::TableProvider;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::ListingTable;
-use datafusion::logical_plan::plan::EmptyRelation;
+use datafusion::logical_plan::plan::{EmptyRelation, Filter, Projection, Window};
use datafusion::logical_plan::{
exprlist_to_fields,
window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
@@ -779,9 +779,9 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
)))
}
}
- LogicalPlan::Projection {
+ LogicalPlan::Projection(Projection {
expr, input, alias, ..
- } => Ok(protobuf::LogicalPlanNode {
+ }) => Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
protobuf::ProjectionNode {
input: Some(Box::new(input.as_ref().try_into()?)),
@@ -796,7 +796,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
},
))),
}),
- LogicalPlan::Filter { predicate, input } => {
+ LogicalPlan::Filter(Filter { predicate, input }) => {
let input: protobuf::LogicalPlanNode = input.as_ref().try_into()?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Selection(Box::new(
@@ -807,9 +807,9 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
))),
})
}
- LogicalPlan::Window {
+ LogicalPlan::Window(Window {
input, window_expr, ..
- } => {
+ }) => {
let input: protobuf::LogicalPlanNode = input.as_ref().try_into()?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Window(Box::new(
diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index 5f77b2b..9e5ac1f 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -1177,6 +1177,7 @@ impl FunctionRegistry for ExecutionContextState {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::logical_plan::plan::Projection;
use crate::logical_plan::TableScanPlan;
use crate::logical_plan::{binary_expr, lit, Operator};
use crate::physical_plan::functions::{make_scalar_function, Volatility};
@@ -1415,7 +1416,7 @@ mod tests {
let optimized_plan = ctx.optimize(&logical_plan)?;
match &optimized_plan {
- LogicalPlan::Projection { input, .. } => match &**input {
+ LogicalPlan::Projection(Projection { input, .. }) => match &**input {
LogicalPlan::TableScan(TableScanPlan {
source,
projected_schema,
@@ -1488,7 +1489,7 @@ mod tests {
let ctx = ExecutionContext::new();
let optimized_plan = ctx.optimize(&plan)?;
match &optimized_plan {
- LogicalPlan::Projection { input, .. } => match &**input {
+ LogicalPlan::Projection(Projection { input, .. }) => match &**input {
LogicalPlan::TableScan(TableScanPlan {
source,
projected_schema,
diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs
index 94e53d6..2eb0091 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -26,7 +26,8 @@ use crate::datasource::{
};
use crate::error::{DataFusionError, Result};
use crate::logical_plan::plan::{
- AnalyzePlan, EmptyRelation, ExplainPlan, TableScanPlan, ToStringifiedPlan, Union,
+ AnalyzePlan, EmptyRelation, ExplainPlan, Filter, Projection, TableScanPlan,
+ ToStringifiedPlan, Union, Window,
};
use crate::prelude::*;
use crate::scalar::ScalarValue;
@@ -451,10 +452,10 @@ impl LogicalPlanBuilder {
/// Apply a filter
pub fn filter(&self, expr: impl Into<Expr>) -> Result<Self> {
let expr = normalize_col(expr.into(), &self.plan)?;
- Ok(Self::from(LogicalPlan::Filter {
+ Ok(Self::from(LogicalPlan::Filter(Filter {
predicate: expr,
input: Arc::new(self.plan.clone()),
- }))
+ })))
}
/// Apply a limit
@@ -658,11 +659,11 @@ impl LogicalPlanBuilder {
let mut window_fields: Vec<DFField> =
exprlist_to_fields(all_expr, self.plan.schema())?;
window_fields.extend_from_slice(self.plan.schema().fields());
- Ok(Self::from(LogicalPlan::Window {
+ Ok(Self::from(LogicalPlan::Window(Window {
input: Arc::new(self.plan.clone()),
window_expr,
schema: Arc::new(DFSchema::new(window_fields)?),
- }))
+ })))
}
/// Apply an aggregate: grouping on the `group_expr` expressions
@@ -898,12 +899,12 @@ pub fn project_with_alias(
Some(ref alias) => input_schema.replace_qualifier(alias.as_str()),
None => input_schema,
};
- Ok(LogicalPlan::Projection {
+ Ok(LogicalPlan::Projection(Projection {
expr: projected_expr,
input: Arc::new(plan.clone()),
schema: DFSchemaRef::new(schema),
alias,
- })
+ }))
}
/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s.
diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs
index 31de255..a7aac90 100644
--- a/datafusion/src/logical_plan/plan.rs
+++ b/datafusion/src/logical_plan/plan.rs
@@ -57,6 +57,47 @@ pub enum JoinConstraint {
Using,
}
+/// Evaluates an arbitrary list of expressions (essentially a
+/// SELECT with an expression list) on its input.
+#[derive(Clone)]
+pub struct Projection {
+ /// The list of expressions
+ pub expr: Vec<Expr>,
+ /// The incoming logical plan
+ pub input: Arc<LogicalPlan>,
+ /// The schema description of the output
+ pub schema: DFSchemaRef,
+ /// Projection output relation alias
+ pub alias: Option<String>,
+}
+
+/// Filters rows from its input that do not match an
+/// expression (essentially a WHERE clause with a predicate
+/// expression).
+///
+/// Semantically, `<predicate>` is evaluated for each row of the input;
+/// If the value of `<predicate>` is true, the input row is passed to
+/// the output. If the value of `<predicate>` is false, the row is
+/// discarded.
+#[derive(Clone)]
+pub struct Filter {
+ /// The predicate expression, which must have Boolean type.
+ pub predicate: Expr,
+ /// The incoming logical plan
+ pub input: Arc<LogicalPlan>,
+}
+
+/// Window its input based on a set of window spec and window function (e.g. SUM or RANK)
+#[derive(Clone)]
+pub struct Window {
+ /// The incoming logical plan
+ pub input: Arc<LogicalPlan>,
+ /// The window function expression
+ pub window_expr: Vec<Expr>,
+ /// The schema description of the window output
+ pub schema: DFSchemaRef,
+}
+
/// Produces rows from a table provider by reference or from the context
#[derive(Clone)]
pub struct TableScanPlan {
@@ -214,16 +255,7 @@ pub struct Values {
pub enum LogicalPlan {
/// Evaluates an arbitrary list of expressions (essentially a
/// SELECT with an expression list) on its input.
- Projection {
- /// The list of expressions
- expr: Vec<Expr>,
- /// The incoming logical plan
- input: Arc<LogicalPlan>,
- /// The schema description of the output
- schema: DFSchemaRef,
- /// Projection output relation alias
- alias: Option<String>,
- },
+ Projection(Projection),
/// Filters rows from its input that do not match an
/// expression (essentially a WHERE clause with a predicate
/// expression).
@@ -232,21 +264,9 @@ pub enum LogicalPlan {
/// If the value of `<predicate>` is true, the input row is passed to
/// the output. If the value of `<predicate>` is false, the row is
/// discarded.
- Filter {
- /// The predicate expression, which must have Boolean type.
- predicate: Expr,
- /// The incoming logical plan
- input: Arc<LogicalPlan>,
- },
+ Filter(Filter),
/// Window its input based on a set of window spec and window function (e.g. SUM or RANK)
- Window {
- /// The incoming logical plan
- input: Arc<LogicalPlan>,
- /// The window function expression
- window_expr: Vec<Expr>,
- /// The schema description of the window output
- schema: DFSchemaRef,
- },
+ Window(Window),
/// Aggregates its input based on a set of grouping and aggregate
/// expressions (e.g. SUM).
Aggregate {
@@ -324,9 +344,9 @@ impl LogicalPlan {
LogicalPlan::TableScan(TableScanPlan {
projected_schema, ..
}) => projected_schema,
- LogicalPlan::Projection { schema, .. } => schema,
- LogicalPlan::Filter { input, .. } => input.schema(),
- LogicalPlan::Window { schema, .. } => schema,
+ LogicalPlan::Projection(Projection { schema, .. }) => schema,
+ LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
+ LogicalPlan::Window(Window { schema, .. }) => schema,
LogicalPlan::Aggregate { schema, .. } => schema,
LogicalPlan::Sort { input, .. } => input.schema(),
LogicalPlan::Join { schema, .. } => schema,
@@ -354,9 +374,9 @@ impl LogicalPlan {
projected_schema, ..
}) => vec![projected_schema],
LogicalPlan::Values(Values { schema, .. }) => vec![schema],
- LogicalPlan::Window { input, schema, .. }
+ LogicalPlan::Window(Window { input, schema, .. })
| LogicalPlan::Aggregate { input, schema, .. }
- | LogicalPlan::Projection { input, schema, .. } => {
+ | LogicalPlan::Projection(Projection { input, schema, .. }) => {
let mut schemas = input.all_schemas();
schemas.insert(0, schema);
schemas
@@ -391,7 +411,7 @@ impl LogicalPlan {
| LogicalPlan::Repartition(Repartition { input, .. })
| LogicalPlan::Sort { input, .. }
| LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
- | LogicalPlan::Filter { input, .. } => input.all_schemas(),
+ | LogicalPlan::Filter(Filter { input, .. }) => input.all_schemas(),
LogicalPlan::DropTable(_) => vec![],
}
}
@@ -409,11 +429,11 @@ impl LogicalPlan {
/// children
pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
match self {
- LogicalPlan::Projection { expr, .. } => expr.clone(),
+ LogicalPlan::Projection(Projection { expr, .. }) => expr.clone(),
LogicalPlan::Values(Values { values, .. }) => {
values.iter().flatten().cloned().collect()
}
- LogicalPlan::Filter { predicate, .. } => vec![predicate.clone()],
+ LogicalPlan::Filter(Filter { predicate, .. }) => vec![predicate.clone()],
LogicalPlan::Repartition(Repartition {
partitioning_scheme,
..
@@ -421,7 +441,7 @@ impl LogicalPlan {
Partitioning::Hash(expr, _) => expr.clone(),
_ => vec![],
},
- LogicalPlan::Window { window_expr, .. } => window_expr.clone(),
+ LogicalPlan::Window(Window { window_expr, .. }) => window_expr.clone(),
LogicalPlan::Aggregate {
group_expr,
aggr_expr,
@@ -453,10 +473,10 @@ impl LogicalPlan {
/// include inputs to inputs.
pub fn inputs(self: &LogicalPlan) -> Vec<&LogicalPlan> {
match self {
- LogicalPlan::Projection { input, .. } => vec![input],
- LogicalPlan::Filter { input, .. } => vec![input],
+ LogicalPlan::Projection(Projection { input, .. }) => vec![input],
+ LogicalPlan::Filter(Filter { input, .. }) => vec![input],
LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
- LogicalPlan::Window { input, .. } => vec![input],
+ LogicalPlan::Window(Window { input, .. }) => vec![input],
LogicalPlan::Aggregate { input, .. } => vec![input],
LogicalPlan::Sort { input, .. } => vec![input],
LogicalPlan::Join { left, right, .. } => vec![left, right],
@@ -588,12 +608,12 @@ impl LogicalPlan {
}
let recurse = match self {
- LogicalPlan::Projection { input, .. } => input.accept(visitor)?,
- LogicalPlan::Filter { input, .. } => input.accept(visitor)?,
+ LogicalPlan::Projection(Projection { input, .. }) => input.accept(visitor)?,
+ LogicalPlan::Filter(Filter { input, .. }) => input.accept(visitor)?,
LogicalPlan::Repartition(Repartition { input, .. }) => {
input.accept(visitor)?
}
- LogicalPlan::Window { input, .. } => input.accept(visitor)?,
+ LogicalPlan::Window(Window { input, .. }) => input.accept(visitor)?,
LogicalPlan::Aggregate { input, .. } => input.accept(visitor)?,
LogicalPlan::Sort { input, .. } => input.accept(visitor)?,
LogicalPlan::Join { left, right, .. }
@@ -856,9 +876,9 @@ impl LogicalPlan {
Ok(())
}
- LogicalPlan::Projection {
+ LogicalPlan::Projection(Projection {
ref expr, alias, ..
- } => {
+ }) => {
write!(f, "Projection: ")?;
for (i, expr_item) in expr.iter().enumerate() {
if i > 0 {
@@ -871,13 +891,13 @@ impl LogicalPlan {
}
Ok(())
}
- LogicalPlan::Filter {
+ LogicalPlan::Filter(Filter {
predicate: ref expr,
..
- } => write!(f, "Filter: {:?}", expr),
- LogicalPlan::Window {
+ }) => write!(f, "Filter: {:?}", expr),
+ LogicalPlan::Window(Window {
ref window_expr, ..
- } => {
+ }) => {
write!(f, "WindowAggr: windowExpr=[{:?}]", window_expr)
}
LogicalPlan::Aggregate {
diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs b/datafusion/src/optimizer/common_subexpr_eliminate.rs
index 930f0b8..ced2c0c 100644
--- a/datafusion/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs
@@ -19,6 +19,7 @@
use crate::error::Result;
use crate::execution::context::ExecutionProps;
+use crate::logical_plan::plan::{Filter, Projection, Window};
use crate::logical_plan::{
col, DFField, DFSchema, Expr, ExprRewriter, ExpressionVisitor, LogicalPlan,
Recursion, RewriteRecursion,
@@ -77,12 +78,12 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
let mut expr_set = ExprSet::new();
match plan {
- LogicalPlan::Projection {
+ LogicalPlan::Projection(Projection {
expr,
input,
schema,
alias,
- } => {
+ }) => {
let arrays = to_arrays(expr, input, &mut expr_set)?;
let (mut new_expr, new_input) = rewrite_expr(
@@ -94,14 +95,14 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
execution_props,
)?;
- Ok(LogicalPlan::Projection {
+ Ok(LogicalPlan::Projection(Projection {
expr: new_expr.pop().unwrap(),
input: Arc::new(new_input),
schema: schema.clone(),
alias: alias.clone(),
- })
+ }))
}
- LogicalPlan::Filter { predicate, input } => {
+ LogicalPlan::Filter(Filter { predicate, input }) => {
let schemas = plan.all_schemas();
let all_schema =
schemas.into_iter().fold(DFSchema::empty(), |mut lhs, rhs| {
@@ -122,16 +123,16 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
execution_props,
)?;
- Ok(LogicalPlan::Filter {
+ Ok(LogicalPlan::Filter(Filter {
predicate: new_expr.pop().unwrap().pop().unwrap(),
input: Arc::new(new_input),
- })
+ }))
}
- LogicalPlan::Window {
+ LogicalPlan::Window(Window {
input,
window_expr,
schema,
- } => {
+ }) => {
let arrays = to_arrays(window_expr, input, &mut expr_set)?;
let (mut new_expr, new_input) = rewrite_expr(
@@ -143,11 +144,11 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
execution_props,
)?;
- Ok(LogicalPlan::Window {
+ Ok(LogicalPlan::Window(Window {
input: Arc::new(new_input),
window_expr: new_expr.pop().unwrap(),
schema: schema.clone(),
- })
+ }))
}
LogicalPlan::Aggregate {
input,
@@ -265,12 +266,12 @@ fn build_project_plan(
let mut schema = DFSchema::new(fields)?;
schema.merge(input.schema());
- Ok(LogicalPlan::Projection {
+ Ok(LogicalPlan::Projection(Projection {
expr: project_exprs,
input: Arc::new(input),
schema: Arc::new(schema),
alias: None,
- })
+ }))
}
#[inline]
diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs
index 7e5c2c1..4214a57 100644
--- a/datafusion/src/optimizer/filter_push_down.rs
+++ b/datafusion/src/optimizer/filter_push_down.rs
@@ -16,6 +16,7 @@
use crate::datasource::datasource::TableProviderFilterPushDown;
use crate::execution::context::ExecutionProps;
+use crate::logical_plan::plan::{Filter, Projection};
use crate::logical_plan::{
and, replace_col, Column, CrossJoin, Limit, LogicalPlan, TableScanPlan,
};
@@ -181,10 +182,10 @@ fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> LogicalPlan {
and(acc, (*predicate).to_owned())
});
- LogicalPlan::Filter {
+ LogicalPlan::Filter(Filter {
predicate,
input: Arc::new(plan),
- }
+ })
}
// remove all filters from `filters` that are in `predicate_columns`
@@ -287,7 +288,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
push_down(&state, plan)
}
LogicalPlan::Analyze { .. } => push_down(&state, plan),
- LogicalPlan::Filter { input, predicate } => {
+ LogicalPlan::Filter(Filter { input, predicate }) => {
let mut predicates = vec![];
split_members(predicate, &mut predicates);
@@ -316,12 +317,12 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
optimize(input, state)
}
}
- LogicalPlan::Projection {
+ LogicalPlan::Projection(Projection {
input,
expr,
schema,
alias: _,
- } => {
+ }) => {
// A projection is filter-commutable, but re-writes all predicate expressions
// collect projection.
let projection = schema
diff --git a/datafusion/src/optimizer/limit_push_down.rs b/datafusion/src/optimizer/limit_push_down.rs
index 320e1c7..e2c65de 100644
--- a/datafusion/src/optimizer/limit_push_down.rs
+++ b/datafusion/src/optimizer/limit_push_down.rs
@@ -20,6 +20,7 @@
use super::utils;
use crate::error::Result;
use crate::execution::context::ExecutionProps;
+use crate::logical_plan::plan::Projection;
use crate::logical_plan::{Limit, TableScanPlan};
use crate::logical_plan::{LogicalPlan, Union};
use crate::optimizer::optimizer::OptimizerRule;
@@ -77,16 +78,16 @@ fn limit_push_down(
projected_schema: projected_schema.clone(),
})),
(
- LogicalPlan::Projection {
+ LogicalPlan::Projection(Projection {
expr,
input,
schema,
alias,
- },
+ }),
upper_limit,
) => {
// Push down limit directly (projection doesn't change number of rows)
- Ok(LogicalPlan::Projection {
+ Ok(LogicalPlan::Projection(Projection {
expr: expr.clone(),
input: Arc::new(limit_push_down(
optimizer,
@@ -96,7 +97,7 @@ fn limit_push_down(
)?),
schema: schema.clone(),
alias: alias.clone(),
- })
+ }))
}
(
LogicalPlan::Union(Union {
diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs
index b6f0ff5..5b1f3ea 100644
--- a/datafusion/src/optimizer/projection_push_down.rs
+++ b/datafusion/src/optimizer/projection_push_down.rs
@@ -20,7 +20,7 @@
use crate::error::{DataFusionError, Result};
use crate::execution::context::ExecutionProps;
-use crate::logical_plan::plan::{AnalyzePlan, TableScanPlan};
+use crate::logical_plan::plan::{AnalyzePlan, Projection, TableScanPlan, Window};
use crate::logical_plan::{
build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan,
LogicalPlanBuilder, ToDFSchema, Union,
@@ -131,12 +131,12 @@ fn optimize_plan(
) -> Result<LogicalPlan> {
let mut new_required_columns = required_columns.clone();
match plan {
- LogicalPlan::Projection {
+ LogicalPlan::Projection(Projection {
input,
expr,
schema,
alias,
- } => {
+ }) => {
// projection:
// * remove any expression that is not required
// * construct the new set of required columns
@@ -182,12 +182,12 @@ fn optimize_plan(
// no need for an expression at all
Ok(new_input)
} else {
- Ok(LogicalPlan::Projection {
+ Ok(LogicalPlan::Projection(Projection {
expr: new_expr,
input: Arc::new(new_input),
schema: DFSchemaRef::new(DFSchema::new(new_fields)?),
alias: alias.clone(),
- })
+ }))
}
}
LogicalPlan::Join {
@@ -236,12 +236,12 @@ fn optimize_plan(
null_equals_null: *null_equals_null,
})
}
- LogicalPlan::Window {
+ LogicalPlan::Window(Window {
schema,
window_expr,
input,
..
- } => {
+ }) => {
// Gather all columns needed for expressions in this Window
let mut new_window_expr = Vec::new();
{
@@ -745,12 +745,12 @@ mod tests {
let expr = vec![col("a"), col("b")];
let projected_fields = exprlist_to_fields(&expr, input_schema).unwrap();
let projected_schema = DFSchema::new(projected_fields).unwrap();
- let plan = LogicalPlan::Projection {
+ let plan = LogicalPlan::Projection(Projection {
expr,
input: Arc::new(table_scan),
schema: Arc::new(projected_schema),
alias: None,
- };
+ });
assert_fields_eq(&plan, vec!["a", "b"]);
diff --git a/datafusion/src/optimizer/single_distinct_to_groupby.rs b/datafusion/src/optimizer/single_distinct_to_groupby.rs
index f6178a2..358444d 100644
--- a/datafusion/src/optimizer/single_distinct_to_groupby.rs
+++ b/datafusion/src/optimizer/single_distinct_to_groupby.rs
@@ -19,6 +19,7 @@
use crate::error::Result;
use crate::execution::context::ExecutionProps;
+use crate::logical_plan::plan::Projection;
use crate::logical_plan::{columnize_expr, DFSchema, Expr, LogicalPlan};
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
@@ -124,12 +125,12 @@ fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
));
});
- Ok(LogicalPlan::Projection {
+ Ok(LogicalPlan::Projection(Projection {
expr: alias_expr,
input: Arc::new(final_agg),
schema: schema.clone(),
alias: Option::None,
- })
+ }))
} else {
optimize_children(plan)
}
diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs
index 05c3629..aa559cd 100644
--- a/datafusion/src/optimizer/utils.rs
+++ b/datafusion/src/optimizer/utils.rs
@@ -23,7 +23,7 @@ use arrow::record_batch::RecordBatch;
use super::optimizer::OptimizerRule;
use crate::execution::context::{ExecutionContextState, ExecutionProps};
-use crate::logical_plan::plan::{AnalyzePlan, ExtensionPlan};
+use crate::logical_plan::plan::{AnalyzePlan, ExtensionPlan, Filter, Projection, Window};
use crate::logical_plan::{
build_join_schema, Column, CreateMemoryTable, DFSchema, DFSchemaRef, Expr,
ExprRewriter, Limit, LogicalPlan, LogicalPlanBuilder, Operator, Partitioning,
@@ -145,12 +145,14 @@ pub fn from_plan(
inputs: &[LogicalPlan],
) -> Result<LogicalPlan> {
match plan {
- LogicalPlan::Projection { schema, alias, .. } => Ok(LogicalPlan::Projection {
- expr: expr.to_vec(),
- input: Arc::new(inputs[0].clone()),
- schema: schema.clone(),
- alias: alias.clone(),
- }),
+ LogicalPlan::Projection(Projection { schema, alias, .. }) => {
+ Ok(LogicalPlan::Projection(Projection {
+ expr: expr.to_vec(),
+ input: Arc::new(inputs[0].clone()),
+ schema: schema.clone(),
+ alias: alias.clone(),
+ }))
+ }
LogicalPlan::Values(Values { schema, .. }) => Ok(LogicalPlan::Values(Values {
schema: schema.clone(),
values: expr
@@ -158,10 +160,10 @@ pub fn from_plan(
.map(|s| s.to_vec())
.collect::<Vec<_>>(),
})),
- LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter {
+ LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter(Filter {
predicate: expr[0].clone(),
input: Arc::new(inputs[0].clone()),
- }),
+ })),
LogicalPlan::Repartition(Repartition {
partitioning_scheme,
..
@@ -177,15 +179,15 @@ pub fn from_plan(
input: Arc::new(inputs[0].clone()),
})),
},
- LogicalPlan::Window {
+ LogicalPlan::Window(Window {
window_expr,
schema,
..
- } => Ok(LogicalPlan::Window {
+ }) => Ok(LogicalPlan::Window(Window {
input: Arc::new(inputs[0].clone()),
window_expr: expr[0..window_expr.len()].to_vec(),
schema: schema.clone(),
- }),
+ })),
LogicalPlan::Aggregate {
group_expr, schema, ..
} => Ok(LogicalPlan::Aggregate {
diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs
index 8f7112e..7fcacd2 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -23,7 +23,7 @@ use super::{
hash_join::PartitionMode, udaf, union::UnionExec, values::ValuesExec, windows,
};
use crate::execution::context::ExecutionContextState;
-use crate::logical_plan::plan::EmptyRelation;
+use crate::logical_plan::plan::{EmptyRelation, Filter, Projection, Window};
use crate::logical_plan::{
unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan, Operator,
Partitioning as LogicalPartitioning, PlanType, Repartition, ToStringifiedPlan, Union,
@@ -365,9 +365,9 @@ impl DefaultPhysicalPlanner {
)?;
Ok(Arc::new(value_exec))
}
- LogicalPlan::Window {
+ LogicalPlan::Window(Window {
input, window_expr, ..
- } => {
+ }) => {
if window_expr.is_empty() {
return Err(DataFusionError::Internal(
"Impossibly got empty window expression".to_owned(),
@@ -571,7 +571,7 @@ impl DefaultPhysicalPlanner {
physical_input_schema.clone(),
)?) )
}
- LogicalPlan::Projection { input, expr, .. } => {
+ LogicalPlan::Projection(Projection { input, expr, .. }) => {
let input_exec = self.create_initial_plan(input, ctx_state).await?;
let input_schema = input.as_ref().schema();
@@ -623,9 +623,9 @@ impl DefaultPhysicalPlanner {
input_exec,
)?) )
}
- LogicalPlan::Filter {
+ LogicalPlan::Filter(Filter {
input, predicate, ..
- } => {
+ }) => {
let physical_input = self.create_initial_plan(input, ctx_state).await?;
let input_schema = physical_input.as_ref().schema();
let input_dfschema = input.as_ref().schema();
diff --git a/datafusion/tests/custom_sources.rs b/datafusion/tests/custom_sources.rs
index 25040a6..a145ca3 100644
--- a/datafusion/tests/custom_sources.rs
+++ b/datafusion/tests/custom_sources.rs
@@ -45,6 +45,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use async_trait::async_trait;
+use datafusion::logical_plan::plan::Projection;
//// Custom source dataframe tests ////
@@ -216,7 +217,7 @@ async fn custom_source_dataframe() -> Result<()> {
let optimized_plan = ctx.optimize(&logical_plan)?;
match &optimized_plan {
- LogicalPlan::Projection { input, .. } => match &**input {
+ LogicalPlan::Projection(Projection { input, .. }) => match &**input {
LogicalPlan::TableScan(TableScanPlan {
source,
projected_schema,
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index e761633..24b2a49 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -36,6 +36,7 @@ use datafusion::assert_batches_eq;
use datafusion::assert_batches_sorted_eq;
use datafusion::assert_contains;
use datafusion::assert_not_contains;
+use datafusion::logical_plan::plan::Projection;
use datafusion::logical_plan::LogicalPlan;
use datafusion::logical_plan::TableScanPlan;
use datafusion::physical_plan::functions::Volatility;
@@ -90,7 +91,7 @@ async fn nyc() -> Result<()> {
let optimized_plan = ctx.optimize(&logical_plan)?;
match &optimized_plan {
- LogicalPlan::Projection { input, .. } => match input.as_ref() {
+ LogicalPlan::Projection(Projection { input, .. }) => match input.as_ref() {
LogicalPlan::Aggregate { input, .. } => match input.as_ref() {
LogicalPlan::TableScan(TableScanPlan {
ref projected_schema,