You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2022/12/14 17:38:21 UTC
[arrow-datafusion] branch master updated: add `try_optimize()` for all rules. (#4599)
This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 508ba80f4 add `try_optimize()` for all rules. (#4599)
508ba80f4 is described below
commit 508ba80f417d671038f0847e116324baa0ec1a5b
Author: jakevin <ja...@gmail.com>
AuthorDate: Thu Dec 15 01:38:15 2022 +0800
add `try_optimize()` for all rules. (#4599)
---
.../optimizer/src/common_subexpr_eliminate.rs | 58 +++++++++++------
.../optimizer/src/decorrelate_where_exists.rs | 14 ++--
datafusion/optimizer/src/decorrelate_where_in.rs | 34 +++++++---
datafusion/optimizer/src/eliminate_cross_join.rs | 35 +++++++---
datafusion/optimizer/src/eliminate_filter.rs | 22 +++++--
datafusion/optimizer/src/eliminate_limit.rs | 26 ++++++--
datafusion/optimizer/src/eliminate_outer_join.rs | 28 +++++++-
datafusion/optimizer/src/filter_null_join_keys.rs | 28 ++++++--
datafusion/optimizer/src/inline_table_scan.rs | 20 +++++-
.../optimizer/src/propagate_empty_relation.rs | 54 +++++++++-------
datafusion/optimizer/src/push_down_filter.rs | 40 ++++++++++--
datafusion/optimizer/src/push_down_limit.rs | 34 ++++++++--
datafusion/optimizer/src/push_down_projection.rs | 18 +++++-
.../optimizer/src/rewrite_disjunctive_predicate.rs | 20 +++++-
.../optimizer/src/scalar_subquery_to_join.rs | 26 ++++++--
.../optimizer/src/single_distinct_to_groupby.rs | 36 ++++++++---
.../optimizer/src/subquery_filter_to_join.rs | 28 ++++++--
datafusion/optimizer/src/type_coercion.rs | 14 +++-
.../optimizer/src/unwrap_cast_in_comparison.rs | 75 +++++++++++++---------
19 files changed, 460 insertions(+), 150 deletions(-)
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index 482298e16..b541aa0ee 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -79,7 +79,9 @@ impl CommonSubexprEliminate {
})
.collect::<Result<Vec<_>>>()?;
- let mut new_input = self.optimize(input, optimizer_config)?;
+ let mut new_input = self
+ .try_optimize(input, optimizer_config)?
+ .unwrap_or_else(|| input.clone());
if !affected_id.is_empty() {
new_input = build_project_plan(new_input, affected_id, expr_set)?;
}
@@ -94,6 +96,16 @@ impl OptimizerRule for CommonSubexprEliminate {
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
+ Ok(self
+ .try_optimize(plan, optimizer_config)?
+ .unwrap_or_else(|| plan.clone()))
+ }
+
+ fn try_optimize(
+ &self,
+ plan: &LogicalPlan,
+ optimizer_config: &mut OptimizerConfig,
+ ) -> Result<Option<LogicalPlan>> {
let mut expr_set = ExprSet::new();
match plan {
@@ -113,11 +125,13 @@ impl OptimizerRule for CommonSubexprEliminate {
optimizer_config,
)?;
- Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
- pop_expr(&mut new_expr)?,
- Arc::new(new_input),
- schema.clone(),
- )?))
+ Ok(Some(LogicalPlan::Projection(
+ Projection::try_new_with_schema(
+ pop_expr(&mut new_expr)?,
+ Arc::new(new_input),
+ schema.clone(),
+ )?,
+ )))
}
LogicalPlan::Filter(filter) => {
let input = filter.input();
@@ -140,10 +154,10 @@ impl OptimizerRule for CommonSubexprEliminate {
)?;
if let Some(predicate) = pop_expr(&mut new_expr)?.pop() {
- Ok(LogicalPlan::Filter(Filter::try_new(
+ Ok(Some(LogicalPlan::Filter(Filter::try_new(
predicate,
Arc::new(new_input),
- )?))
+ )?)))
} else {
Err(DataFusionError::Internal(
"Failed to pop predicate expr".to_string(),
@@ -166,11 +180,11 @@ impl OptimizerRule for CommonSubexprEliminate {
optimizer_config,
)?;
- Ok(LogicalPlan::Window(Window {
+ Ok(Some(LogicalPlan::Window(Window {
input: Arc::new(new_input),
window_expr: pop_expr(&mut new_expr)?,
schema: schema.clone(),
- }))
+ })))
}
LogicalPlan::Aggregate(Aggregate {
group_expr,
@@ -194,12 +208,14 @@ impl OptimizerRule for CommonSubexprEliminate {
let new_aggr_expr = pop_expr(&mut new_expr)?;
let new_group_expr = pop_expr(&mut new_expr)?;
- Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
- Arc::new(new_input),
- new_group_expr,
- new_aggr_expr,
- schema.clone(),
- )?))
+ Ok(Some(LogicalPlan::Aggregate(
+ Aggregate::try_new_with_schema(
+ Arc::new(new_input),
+ new_group_expr,
+ new_aggr_expr,
+ schema.clone(),
+ )?,
+ )))
}
LogicalPlan::Sort(Sort { expr, input, fetch }) => {
let input_schema = Arc::clone(input.schema());
@@ -213,11 +229,11 @@ impl OptimizerRule for CommonSubexprEliminate {
optimizer_config,
)?;
- Ok(LogicalPlan::Sort(Sort {
+ Ok(Some(LogicalPlan::Sort(Sort {
expr: pop_expr(&mut new_expr)?,
input: Arc::new(new_input),
fetch: *fetch,
- }))
+ })))
}
LogicalPlan::Join(_)
| LogicalPlan::CrossJoin(_)
@@ -243,7 +259,11 @@ impl OptimizerRule for CommonSubexprEliminate {
| LogicalPlan::Extension(_)
| LogicalPlan::Prepare(_) => {
// apply the optimization to all inputs of the plan
- utils::optimize_children(self, plan, optimizer_config)
+ Ok(Some(utils::optimize_children(
+ self,
+ plan,
+ optimizer_config,
+ )?))
}
}
}
diff --git a/datafusion/optimizer/src/decorrelate_where_exists.rs b/datafusion/optimizer/src/decorrelate_where_exists.rs
index 9cf9138bc..6b5bd184f 100644
--- a/datafusion/optimizer/src/decorrelate_where_exists.rs
+++ b/datafusion/optimizer/src/decorrelate_where_exists.rs
@@ -57,8 +57,10 @@ impl DecorrelateWhereExists {
for it in filters.iter() {
match it {
Expr::Exists { subquery, negated } => {
- let subquery = self.optimize(&subquery.subquery, optimizer_config)?;
- let subquery = Arc::new(subquery);
+ let subquery = self
+ .try_optimize(&subquery.subquery, optimizer_config)?
+ .map(Arc::new)
+ .unwrap_or_else(|| subquery.subquery.clone());
let subquery = Subquery { subquery };
let subquery = SubqueryInfo::new(subquery.clone(), *negated);
subqueries.push(subquery);
@@ -90,10 +92,12 @@ impl OptimizerRule for DecorrelateWhereExists {
match plan {
LogicalPlan::Filter(filter) => {
let predicate = filter.predicate();
- let filter_input = filter.input();
+ let filter_input = filter.input().as_ref();
// Apply optimizer rule to current input
- let optimized_input = self.optimize(filter_input, optimizer_config)?;
+ let optimized_input = self
+ .try_optimize(filter_input, optimizer_config)?
+ .unwrap_or_else(|| filter_input.clone());
let (subqueries, other_exprs) =
self.extract_subquery_exprs(predicate, optimizer_config)?;
@@ -107,7 +111,7 @@ impl OptimizerRule for DecorrelateWhereExists {
}
// iterate through all exists clauses in predicate, turning each into a join
- let mut cur_input = (**filter_input).clone();
+ let mut cur_input = filter_input.clone();
for subquery in subqueries {
if let Some(x) = optimize_exists(&subquery, &cur_input, &other_exprs)?
{
diff --git a/datafusion/optimizer/src/decorrelate_where_in.rs b/datafusion/optimizer/src/decorrelate_where_in.rs
index 1818e5897..4614b0699 100644
--- a/datafusion/optimizer/src/decorrelate_where_in.rs
+++ b/datafusion/optimizer/src/decorrelate_where_in.rs
@@ -59,8 +59,10 @@ impl DecorrelateWhereIn {
subquery,
negated,
} => {
- let subquery = self.optimize(&subquery.subquery, optimizer_config)?;
- let subquery = Arc::new(subquery);
+ let subquery = self
+ .try_optimize(&subquery.subquery, optimizer_config)?
+ .map(Arc::new)
+ .unwrap_or_else(|| subquery.subquery.clone());
let subquery = Subquery { subquery };
let subquery =
SubqueryInfo::new(subquery.clone(), (**expr).clone(), *negated);
@@ -81,13 +83,25 @@ impl OptimizerRule for DecorrelateWhereIn {
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> datafusion_common::Result<LogicalPlan> {
+ Ok(self
+ .try_optimize(plan, optimizer_config)?
+ .unwrap_or_else(|| plan.clone()))
+ }
+
+ fn try_optimize(
+ &self,
+ plan: &LogicalPlan,
+ optimizer_config: &mut OptimizerConfig,
+ ) -> datafusion_common::Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Filter(filter) => {
let predicate = filter.predicate();
- let filter_input = filter.input();
+ let filter_input = filter.input().as_ref();
// Apply optimizer rule to current input
- let optimized_input = self.optimize(filter_input, optimizer_config)?;
+ let optimized_input = self
+ .try_optimize(filter_input, optimizer_config)?
+ .unwrap_or_else(|| filter_input.clone());
let (subqueries, other_exprs) =
self.extract_subquery_exprs(predicate, optimizer_config)?;
@@ -97,11 +111,11 @@ impl OptimizerRule for DecorrelateWhereIn {
)?);
if subqueries.is_empty() {
// regular filter, no subquery exists clause here
- return Ok(optimized_plan);
+ return Ok(Some(optimized_plan));
}
// iterate through all exists clauses in predicate, turning each into a join
- let mut cur_input = (**filter_input).clone();
+ let mut cur_input = filter_input.clone();
for subquery in subqueries {
cur_input = optimize_where_in(
&subquery,
@@ -110,11 +124,15 @@ impl OptimizerRule for DecorrelateWhereIn {
optimizer_config,
)?;
}
- Ok(cur_input)
+ Ok(Some(cur_input))
}
_ => {
// Apply the optimization to all inputs of the plan
- utils::optimize_children(self, plan, optimizer_config)
+ Ok(Some(utils::optimize_children(
+ self,
+ plan,
+ optimizer_config,
+ )?))
}
}
}
diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs b/datafusion/optimizer/src/eliminate_cross_join.rs
index 8ca457771..b431742f0 100644
--- a/datafusion/optimizer/src/eliminate_cross_join.rs
+++ b/datafusion/optimizer/src/eliminate_cross_join.rs
@@ -56,6 +56,16 @@ impl OptimizerRule for EliminateCrossJoin {
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
+ Ok(self
+ .try_optimize(plan, _optimizer_config)?
+ .unwrap_or_else(|| plan.clone()))
+ }
+
+ fn try_optimize(
+ &self,
+ plan: &LogicalPlan,
+ _optimizer_config: &mut OptimizerConfig,
+ ) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Filter(filter) => {
let input = (**filter.input()).clone();
@@ -78,7 +88,11 @@ impl OptimizerRule for EliminateCrossJoin {
)?;
}
_ => {
- return utils::optimize_children(self, plan, _optimizer_config);
+ return Ok(Some(utils::optimize_children(
+ self,
+ plan,
+ _optimizer_config,
+ )?));
}
}
@@ -109,23 +123,26 @@ impl OptimizerRule for EliminateCrossJoin {
// if there are no join keys then do nothing.
if all_join_keys.is_empty() {
- Ok(LogicalPlan::Filter(Filter::try_new(
+ Ok(Some(LogicalPlan::Filter(Filter::try_new(
predicate.clone(),
Arc::new(left),
- )?))
+ )?)))
} else {
// remove join expressions from filter
match remove_join_expressions(predicate, &all_join_keys)? {
- Some(filter_expr) => Ok(LogicalPlan::Filter(Filter::try_new(
- filter_expr,
- Arc::new(left),
- )?)),
- _ => Ok(left),
+ Some(filter_expr) => Ok(Some(LogicalPlan::Filter(
+ Filter::try_new(filter_expr, Arc::new(left))?,
+ ))),
+ _ => Ok(Some(left)),
}
}
}
- _ => utils::optimize_children(self, plan, _optimizer_config),
+ _ => Ok(Some(utils::optimize_children(
+ self,
+ plan,
+ _optimizer_config,
+ )?)),
}
}
diff --git a/datafusion/optimizer/src/eliminate_filter.rs b/datafusion/optimizer/src/eliminate_filter.rs
index d5cbcf394..3e1c91a70 100644
--- a/datafusion/optimizer/src/eliminate_filter.rs
+++ b/datafusion/optimizer/src/eliminate_filter.rs
@@ -42,6 +42,16 @@ impl OptimizerRule for EliminateFilter {
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
+ Ok(self
+ .try_optimize(plan, _optimizer_config)?
+ .unwrap_or_else(|| plan.clone()))
+ }
+
+ fn try_optimize(
+ &self,
+ plan: &LogicalPlan,
+ _optimizer_config: &mut OptimizerConfig,
+ ) -> Result<Option<LogicalPlan>> {
let predicate_and_input = match plan {
LogicalPlan::Filter(filter) => match filter.predicate() {
Expr::Literal(ScalarValue::Boolean(Some(v))) => {
@@ -53,14 +63,18 @@ impl OptimizerRule for EliminateFilter {
};
match predicate_and_input {
- Some((true, input)) => self.optimize(input, _optimizer_config),
- Some((false, input)) => Ok(LogicalPlan::EmptyRelation(EmptyRelation {
+ Some((true, input)) => self.try_optimize(input, _optimizer_config),
+ Some((false, input)) => Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: input.schema().clone(),
- })),
+ }))),
None => {
// Apply the optimization to all inputs of the plan
- utils::optimize_children(self, plan, _optimizer_config)
+ Ok(Some(utils::optimize_children(
+ self,
+ plan,
+ _optimizer_config,
+ )?))
}
}
}
diff --git a/datafusion/optimizer/src/eliminate_limit.rs b/datafusion/optimizer/src/eliminate_limit.rs
index 9ad39d1db..b60b16e83 100644
--- a/datafusion/optimizer/src/eliminate_limit.rs
+++ b/datafusion/optimizer/src/eliminate_limit.rs
@@ -42,25 +42,43 @@ impl OptimizerRule for EliminateLimit {
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
+ Ok(self
+ .try_optimize(plan, optimizer_config)?
+ .unwrap_or_else(|| plan.clone()))
+ }
+
+ fn try_optimize(
+ &self,
+ plan: &LogicalPlan,
+ optimizer_config: &mut OptimizerConfig,
+ ) -> Result<Option<LogicalPlan>> {
if let LogicalPlan::Limit(limit) = plan {
match limit.fetch {
Some(fetch) => {
if fetch == 0 {
- return Ok(LogicalPlan::EmptyRelation(EmptyRelation {
+ return Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: limit.input.schema().clone(),
- }));
+ })));
}
}
None => {
if limit.skip == 0 {
let input = &*limit.input;
- return utils::optimize_children(self, input, optimizer_config);
+ return Ok(Some(utils::optimize_children(
+ self,
+ input,
+ optimizer_config,
+ )?));
}
}
}
}
- utils::optimize_children(self, plan, optimizer_config)
+ Ok(Some(utils::optimize_children(
+ self,
+ plan,
+ optimizer_config,
+ )?))
}
fn name(&self) -> &str {
diff --git a/datafusion/optimizer/src/eliminate_outer_join.rs b/datafusion/optimizer/src/eliminate_outer_join.rs
index 9be661624..4d1fecad7 100644
--- a/datafusion/optimizer/src/eliminate_outer_join.rs
+++ b/datafusion/optimizer/src/eliminate_outer_join.rs
@@ -67,6 +67,16 @@ impl OptimizerRule for EliminateOuterJoin {
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
+ Ok(self
+ .try_optimize(plan, optimizer_config)?
+ .unwrap_or_else(|| plan.clone()))
+ }
+
+ fn try_optimize(
+ &self,
+ plan: &LogicalPlan,
+ optimizer_config: &mut OptimizerConfig,
+ ) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Filter(filter) => match filter.input().as_ref() {
LogicalPlan::Join(join) => {
@@ -110,11 +120,23 @@ impl OptimizerRule for EliminateOuterJoin {
null_equals_null: join.null_equals_null,
});
let new_plan = from_plan(plan, &plan.expressions(), &[new_join])?;
- utils::optimize_children(self, &new_plan, optimizer_config)
+ Ok(Some(utils::optimize_children(
+ self,
+ &new_plan,
+ optimizer_config,
+ )?))
}
- _ => utils::optimize_children(self, plan, optimizer_config),
+ _ => Ok(Some(utils::optimize_children(
+ self,
+ plan,
+ optimizer_config,
+ )?)),
},
- _ => utils::optimize_children(self, plan, optimizer_config),
+ _ => Ok(Some(utils::optimize_children(
+ self,
+ plan,
+ optimizer_config,
+ )?)),
}
}
diff --git a/datafusion/optimizer/src/filter_null_join_keys.rs b/datafusion/optimizer/src/filter_null_join_keys.rs
index be33c796e..f1a4f1393 100644
--- a/datafusion/optimizer/src/filter_null_join_keys.rs
+++ b/datafusion/optimizer/src/filter_null_join_keys.rs
@@ -40,12 +40,28 @@ impl OptimizerRule for FilterNullJoinKeys {
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> datafusion_common::Result<LogicalPlan> {
+ Ok(self
+ .try_optimize(plan, optimizer_config)?
+ .unwrap_or_else(|| plan.clone()))
+ }
+
+ fn try_optimize(
+ &self,
+ plan: &LogicalPlan,
+ optimizer_config: &mut OptimizerConfig,
+ ) -> datafusion_common::Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Join(join) if join.join_type == JoinType::Inner => {
// recurse down first and optimize inputs
let mut join = join.clone();
- join.left = Arc::new(self.optimize(&join.left, optimizer_config)?);
- join.right = Arc::new(self.optimize(&join.right, optimizer_config)?);
+ join.left = Arc::new(
+ self.try_optimize(&join.left, optimizer_config)?
+ .unwrap_or_else(|| join.left.as_ref().clone()),
+ );
+ join.right = Arc::new(
+ self.try_optimize(&join.right, optimizer_config)?
+ .unwrap_or_else(|| join.right.as_ref().clone()),
+ );
let left_schema = join.left.schema();
let right_schema = join.right.schema();
@@ -80,11 +96,15 @@ impl OptimizerRule for FilterNullJoinKeys {
join.right.clone(),
)?));
}
- Ok(LogicalPlan::Join(join))
+ Ok(Some(LogicalPlan::Join(join)))
}
_ => {
// Apply the optimization to all inputs of the plan
- utils::optimize_children(self, plan, optimizer_config)
+ Ok(Some(utils::optimize_children(
+ self,
+ plan,
+ optimizer_config,
+ )?))
}
}
}
diff --git a/datafusion/optimizer/src/inline_table_scan.rs b/datafusion/optimizer/src/inline_table_scan.rs
index 7fdb594be..e0238a751 100644
--- a/datafusion/optimizer/src/inline_table_scan.rs
+++ b/datafusion/optimizer/src/inline_table_scan.rs
@@ -40,6 +40,16 @@ impl OptimizerRule for InlineTableScan {
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
+ Ok(self
+ .try_optimize(plan, _optimizer_config)?
+ .unwrap_or_else(|| plan.clone()))
+ }
+
+ fn try_optimize(
+ &self,
+ plan: &LogicalPlan,
+ _optimizer_config: &mut OptimizerConfig,
+ ) -> Result<Option<LogicalPlan>> {
match plan {
// Match only on scans without filter / projection / fetch
// Views and DataFrames won't have those added
@@ -57,17 +67,21 @@ impl OptimizerRule for InlineTableScan {
let plan = LogicalPlanBuilder::from(plan)
.project(vec![Expr::Wildcard])?
.alias(table_name)?;
- plan.build()
+ Ok(Some(plan.build()?))
} else {
// No plan available, return with table scan as is
- Ok(plan.clone())
+ Ok(Some(plan.clone()))
}
}
// Rest: Recurse
_ => {
// apply the optimization to all inputs of the plan
- utils::optimize_children(self, plan, _optimizer_config)
+ Ok(Some(utils::optimize_children(
+ self,
+ plan,
+ _optimizer_config,
+ )?))
}
}
}
diff --git a/datafusion/optimizer/src/propagate_empty_relation.rs b/datafusion/optimizer/src/propagate_empty_relation.rs
index 2de7e72ef..cfdb354d4 100644
--- a/datafusion/optimizer/src/propagate_empty_relation.rs
+++ b/datafusion/optimizer/src/propagate_empty_relation.rs
@@ -39,11 +39,21 @@ impl OptimizerRule for PropagateEmptyRelation {
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
+ Ok(self
+ .try_optimize(plan, optimizer_config)?
+ .unwrap_or_else(|| plan.clone()))
+ }
+
+ fn try_optimize(
+ &self,
+ plan: &LogicalPlan,
+ optimizer_config: &mut OptimizerConfig,
+ ) -> Result<Option<LogicalPlan>> {
// optimize child plans first
let optimized_children_plan =
utils::optimize_children(self, plan, optimizer_config)?;
match &optimized_children_plan {
- LogicalPlan::EmptyRelation(_) => Ok(optimized_children_plan),
+ LogicalPlan::EmptyRelation(_) => Ok(Some(optimized_children_plan)),
LogicalPlan::Projection(_)
| LogicalPlan::Filter(_)
| LogicalPlan::Window(_)
@@ -51,19 +61,19 @@ impl OptimizerRule for PropagateEmptyRelation {
| LogicalPlan::SubqueryAlias(_)
| LogicalPlan::Repartition(_)
| LogicalPlan::Limit(_) => match empty_child(&optimized_children_plan)? {
- Some(empty) => Ok(empty),
- None => Ok(optimized_children_plan),
+ Some(empty) => Ok(Some(empty)),
+ None => Ok(Some(optimized_children_plan)),
},
LogicalPlan::CrossJoin(_) => {
let (left_empty, right_empty) =
binary_plan_children_is_empty(&optimized_children_plan)?;
if left_empty || right_empty {
- Ok(LogicalPlan::EmptyRelation(EmptyRelation {
+ Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: optimized_children_plan.schema().clone(),
- }))
+ })))
} else {
- Ok(optimized_children_plan)
+ Ok(Some(optimized_children_plan))
}
}
LogicalPlan::Join(join) => {
@@ -83,15 +93,15 @@ impl OptimizerRule for PropagateEmptyRelation {
let (left_empty, right_empty) =
binary_plan_children_is_empty(&optimized_children_plan)?;
if left_empty || right_empty {
- Ok(LogicalPlan::EmptyRelation(EmptyRelation {
+ Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: optimized_children_plan.schema().clone(),
- }))
+ })))
} else {
- Ok(optimized_children_plan)
+ Ok(Some(optimized_children_plan))
}
} else {
- Ok(optimized_children_plan)
+ Ok(Some(optimized_children_plan))
}
}
LogicalPlan::Union(union) => {
@@ -106,40 +116,40 @@ impl OptimizerRule for PropagateEmptyRelation {
.collect::<Vec<_>>();
if new_inputs.len() == union.inputs.len() {
- Ok(optimized_children_plan)
+ Ok(Some(optimized_children_plan))
} else if new_inputs.is_empty() {
- Ok(LogicalPlan::EmptyRelation(EmptyRelation {
+ Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: optimized_children_plan.schema().clone(),
- }))
+ })))
} else if new_inputs.len() == 1 {
let child = (**(union.inputs.get(0).unwrap())).clone();
if child.schema().eq(optimized_children_plan.schema()) {
- Ok(child)
+ Ok(Some(child))
} else {
- Ok(LogicalPlan::Projection(Projection::new_from_schema(
+ Ok(Some(LogicalPlan::Projection(Projection::new_from_schema(
Arc::new(child),
optimized_children_plan.schema().clone(),
- )))
+ ))))
}
} else {
- Ok(LogicalPlan::Union(Union {
+ Ok(Some(LogicalPlan::Union(Union {
inputs: new_inputs,
schema: union.schema.clone(),
- }))
+ })))
}
}
LogicalPlan::Aggregate(agg) => {
if !agg.group_expr.is_empty() {
match empty_child(&optimized_children_plan)? {
- Some(empty) => Ok(empty),
- None => Ok(optimized_children_plan),
+ Some(empty) => Ok(Some(empty)),
+ None => Ok(Some(optimized_children_plan)),
}
} else {
- Ok(optimized_children_plan)
+ Ok(Some(optimized_children_plan))
}
}
- _ => Ok(optimized_children_plan),
+ _ => Ok(Some(optimized_children_plan)),
}
}
diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs
index 61051a5a5..40ed00ebd 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -508,19 +508,41 @@ impl OptimizerRule for PushDownFilter {
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
+ Ok(self
+ .try_optimize(plan, optimizer_config)?
+ .unwrap_or_else(|| plan.clone()))
+ }
+
+ fn try_optimize(
+ &self,
+ plan: &LogicalPlan,
+ optimizer_config: &mut OptimizerConfig,
+ ) -> Result<Option<LogicalPlan>> {
let filter = match plan {
LogicalPlan::Filter(filter) => filter,
// we also need to pushdown filter in Join.
LogicalPlan::Join(join) => {
let optimized_plan = push_down_join(plan, join, None)?;
return match optimized_plan {
- Some(optimized_plan) => {
- utils::optimize_children(self, &optimized_plan, optimizer_config)
- }
- None => utils::optimize_children(self, plan, optimizer_config),
+ Some(optimized_plan) => Ok(Some(utils::optimize_children(
+ self,
+ &optimized_plan,
+ optimizer_config,
+ )?)),
+ None => Ok(Some(utils::optimize_children(
+ self,
+ plan,
+ optimizer_config,
+ )?)),
};
}
- _ => return utils::optimize_children(self, plan, optimizer_config),
+ _ => {
+ return Ok(Some(utils::optimize_children(
+ self,
+ plan,
+ optimizer_config,
+ )?))
+ }
};
let child_plan = &**filter.input();
@@ -532,7 +554,7 @@ impl OptimizerRule for PushDownFilter {
new_predicate,
child_filter.input().clone(),
)?);
- return self.optimize(&new_plan, optimizer_config);
+ return self.try_optimize(&new_plan, optimizer_config);
}
LogicalPlan::Repartition(_)
| LogicalPlan::Distinct(_)
@@ -733,7 +755,11 @@ impl OptimizerRule for PushDownFilter {
_ => plan.clone(),
};
- utils::optimize_children(self, &new_plan, optimizer_config)
+ Ok(Some(utils::optimize_children(
+ self,
+ &new_plan,
+ optimizer_config,
+ )?))
}
}
diff --git a/datafusion/optimizer/src/push_down_limit.rs b/datafusion/optimizer/src/push_down_limit.rs
index 3ce4f441f..a404762b6 100644
--- a/datafusion/optimizer/src/push_down_limit.rs
+++ b/datafusion/optimizer/src/push_down_limit.rs
@@ -80,9 +80,25 @@ impl OptimizerRule for PushDownLimit {
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
+ Ok(self
+ .try_optimize(plan, optimizer_config)?
+ .unwrap_or_else(|| plan.clone()))
+ }
+
+ fn try_optimize(
+ &self,
+ plan: &LogicalPlan,
+ optimizer_config: &mut OptimizerConfig,
+ ) -> Result<Option<LogicalPlan>> {
let limit = match plan {
LogicalPlan::Limit(limit) => limit,
- _ => return utils::optimize_children(self, plan, optimizer_config),
+ _ => {
+ return Ok(Some(utils::optimize_children(
+ self,
+ plan,
+ optimizer_config,
+ )?))
+ }
};
if let LogicalPlan::Limit(child_limit) = &*limit.input {
@@ -112,12 +128,18 @@ impl OptimizerRule for PushDownLimit {
fetch: new_fetch,
input: Arc::new((*child_limit.input).clone()),
});
- return self.optimize(&plan, optimizer_config);
+ return self.try_optimize(&plan, optimizer_config);
}
let fetch = match limit.fetch {
Some(fetch) => fetch,
- None => return utils::optimize_children(self, plan, optimizer_config),
+ None => {
+ return Ok(Some(utils::optimize_children(
+ self,
+ plan,
+ optimizer_config,
+ )?))
+ }
};
let skip = limit.skip;
@@ -225,7 +247,11 @@ impl OptimizerRule for PushDownLimit {
_ => plan.clone(),
};
- utils::optimize_children(self, &plan, optimizer_config)
+ Ok(Some(utils::optimize_children(
+ self,
+ &plan,
+ optimizer_config,
+ )?))
}
fn name(&self) -> &str {
diff --git a/datafusion/optimizer/src/push_down_projection.rs b/datafusion/optimizer/src/push_down_projection.rs
index 2d156d1ce..0238148c3 100644
--- a/datafusion/optimizer/src/push_down_projection.rs
+++ b/datafusion/optimizer/src/push_down_projection.rs
@@ -51,6 +51,16 @@ impl OptimizerRule for PushDownProjection {
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
+ Ok(self
+ .try_optimize(plan, optimizer_config)?
+ .unwrap_or_else(|| plan.clone()))
+ }
+
+ fn try_optimize(
+ &self,
+ plan: &LogicalPlan,
+ optimizer_config: &mut OptimizerConfig,
+ ) -> Result<Option<LogicalPlan>> {
// set of all columns referred by the plan (and thus considered required by the root)
let required_columns = plan
.schema()
@@ -58,7 +68,13 @@ impl OptimizerRule for PushDownProjection {
.iter()
.map(|f| f.qualified_column())
.collect::<HashSet<Column>>();
- optimize_plan(self, plan, &required_columns, false, optimizer_config)
+ Ok(Some(optimize_plan(
+ self,
+ plan,
+ &required_columns,
+ false,
+ optimizer_config,
+ )?))
}
fn name(&self) -> &str {
diff --git a/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs b/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs
index 6a2ba2bde..3f6cc763b 100644
--- a/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs
+++ b/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs
@@ -129,17 +129,31 @@ impl OptimizerRule for RewriteDisjunctivePredicate {
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
+ Ok(self
+ .try_optimize(plan, _optimizer_config)?
+ .unwrap_or_else(|| plan.clone()))
+ }
+
+ fn try_optimize(
+ &self,
+ plan: &LogicalPlan,
+ _optimizer_config: &mut OptimizerConfig,
+ ) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Filter(filter) => {
let predicate = predicate(filter.predicate())?;
let rewritten_predicate = rewrite_predicate(predicate);
let rewritten_expr = normalize_predicate(rewritten_predicate);
- Ok(LogicalPlan::Filter(Filter::try_new(
+ Ok(Some(LogicalPlan::Filter(Filter::try_new(
rewritten_expr,
Arc::new(Self::optimize(self, filter.input(), _optimizer_config)?),
- )?))
+ )?)))
}
- _ => utils::optimize_children(self, plan, _optimizer_config),
+ _ => Ok(Some(utils::optimize_children(
+ self,
+ plan,
+ _optimizer_config,
+ )?)),
}
}
diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs
index f5a35c625..00e4d89c8 100644
--- a/datafusion/optimizer/src/scalar_subquery_to_join.rs
+++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs
@@ -94,6 +94,16 @@ impl OptimizerRule for ScalarSubqueryToJoin {
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
+ Ok(self
+ .try_optimize(plan, optimizer_config)?
+ .unwrap_or_else(|| plan.clone()))
+ }
+
+ fn try_optimize(
+ &self,
+ plan: &LogicalPlan,
+ optimizer_config: &mut OptimizerConfig,
+ ) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Filter(filter) => {
// Apply optimizer rule to current input
@@ -104,10 +114,10 @@ impl OptimizerRule for ScalarSubqueryToJoin {
if subqueries.is_empty() {
// regular filter, no subquery exists clause here
- return Ok(LogicalPlan::Filter(Filter::try_new(
+ return Ok(Some(LogicalPlan::Filter(Filter::try_new(
filter.predicate().clone(),
Arc::new(optimized_input),
- )?));
+ )?)));
}
// iterate through all subqueries in predicate, turning each into a join
@@ -122,17 +132,21 @@ impl OptimizerRule for ScalarSubqueryToJoin {
cur_input = optimized_subquery;
} else {
// if we can't handle all of the subqueries then bail for now
- return Ok(LogicalPlan::Filter(Filter::try_new(
+ return Ok(Some(LogicalPlan::Filter(Filter::try_new(
filter.predicate().clone(),
Arc::new(optimized_input),
- )?));
+ )?)));
}
}
- Ok(cur_input)
+ Ok(Some(cur_input))
}
_ => {
// Apply the optimization to all inputs of the plan
- utils::optimize_children(self, plan, optimizer_config)
+ Ok(Some(utils::optimize_children(
+ self,
+ plan,
+ optimizer_config,
+ )?))
}
}
}
diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs
index 13cf6d3a8..1e1322395 100644
--- a/datafusion/optimizer/src/single_distinct_to_groupby.rs
+++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs
@@ -86,8 +86,18 @@ impl OptimizerRule for SingleDistinctToGroupBy {
fn optimize(
&self,
plan: &LogicalPlan,
- _optimizer_config: &mut OptimizerConfig,
+ optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
+ Ok(self
+ .try_optimize(plan, optimizer_config)?
+ .unwrap_or_else(|| plan.clone()))
+ }
+
+ fn try_optimize(
+ &self,
+ plan: &LogicalPlan,
+ _optimizer_config: &mut OptimizerConfig,
+ ) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Aggregate(Aggregate {
input,
@@ -192,16 +202,26 @@ impl OptimizerRule for SingleDistinctToGroupBy {
new_aggr_exprs,
)?);
- Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
- alias_expr,
- Arc::new(outer_aggr),
- schema.clone(),
- )?))
+ Ok(Some(LogicalPlan::Projection(
+ Projection::try_new_with_schema(
+ alias_expr,
+ Arc::new(outer_aggr),
+ schema.clone(),
+ )?,
+ )))
} else {
- utils::optimize_children(self, plan, _optimizer_config)
+ Ok(Some(utils::optimize_children(
+ self,
+ plan,
+ _optimizer_config,
+ )?))
}
}
- _ => utils::optimize_children(self, plan, _optimizer_config),
+ _ => Ok(Some(utils::optimize_children(
+ self,
+ plan,
+ _optimizer_config,
+ )?)),
}
}
fn name(&self) -> &str {
diff --git a/datafusion/optimizer/src/subquery_filter_to_join.rs b/datafusion/optimizer/src/subquery_filter_to_join.rs
index c077ce7e8..ccf6931d3 100644
--- a/datafusion/optimizer/src/subquery_filter_to_join.rs
+++ b/datafusion/optimizer/src/subquery_filter_to_join.rs
@@ -54,6 +54,16 @@ impl OptimizerRule for SubqueryFilterToJoin {
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
+ Ok(self
+ .try_optimize(plan, optimizer_config)?
+ .unwrap_or_else(|| plan.clone()))
+ }
+
+ fn try_optimize(
+ &self,
+ plan: &LogicalPlan,
+ optimizer_config: &mut OptimizerConfig,
+ ) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Filter(filter) => {
// Apply optimizer rule to current input
@@ -78,10 +88,10 @@ impl OptimizerRule for SubqueryFilterToJoin {
})?;
if !subqueries_in_regular.is_empty() {
- return Ok(LogicalPlan::Filter(Filter::try_new(
+ return Ok(Some(LogicalPlan::Filter(Filter::try_new(
filter.predicate().clone(),
Arc::new(optimized_input),
- )?));
+ )?)));
};
// Add subquery joins to new_input
@@ -150,23 +160,27 @@ impl OptimizerRule for SubqueryFilterToJoin {
let new_input = match opt_result {
Ok(plan) => plan,
Err(_) => {
- return Ok(LogicalPlan::Filter(Filter::try_new(
+ return Ok(Some(LogicalPlan::Filter(Filter::try_new(
filter.predicate().clone(),
Arc::new(optimized_input),
- )?))
+ )?)))
}
};
// Apply regular filters to join output if some or just return join
if regular_filters.is_empty() {
- Ok(new_input)
+ Ok(Some(new_input))
} else {
- utils::add_filter(new_input, ®ular_filters)
+ Ok(Some(utils::add_filter(new_input, ®ular_filters)?))
}
}
_ => {
// Apply the optimization to all inputs of the plan
- utils::optimize_children(self, plan, optimizer_config)
+ Ok(Some(utils::optimize_children(
+ self,
+ plan,
+ optimizer_config,
+ )?))
}
}
}
diff --git a/datafusion/optimizer/src/type_coercion.rs b/datafusion/optimizer/src/type_coercion.rs
index 698abff94..b3e1f4e75 100644
--- a/datafusion/optimizer/src/type_coercion.rs
+++ b/datafusion/optimizer/src/type_coercion.rs
@@ -58,9 +58,19 @@ impl OptimizerRule for TypeCoercion {
fn optimize(
&self,
plan: &LogicalPlan,
- _optimizer_config: &mut OptimizerConfig,
+ optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
- optimize_internal(&DFSchema::empty(), plan)
+ Ok(self
+ .try_optimize(plan, optimizer_config)?
+ .unwrap_or_else(|| plan.clone()))
+ }
+
+ fn try_optimize(
+ &self,
+ plan: &LogicalPlan,
+ _optimizer_config: &mut OptimizerConfig,
+ ) -> Result<Option<LogicalPlan>> {
+ Ok(Some(optimize_internal(&DFSchema::empty(), plan)?))
}
}
diff --git a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
index 8e17c184e..bfcb0f85a 100644
--- a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
+++ b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
@@ -84,42 +84,55 @@ impl OptimizerRule for UnwrapCastInComparison {
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
- optimize(plan)
+ Ok(self
+ .try_optimize(plan, _optimizer_config)?
+ .unwrap_or_else(|| plan.clone()))
}
- fn name(&self) -> &str {
- "unwrap_cast_in_comparison"
- }
-}
+ fn try_optimize(
+ &self,
+ plan: &LogicalPlan,
+ _optimizer_config: &mut OptimizerConfig,
+ ) -> Result<Option<LogicalPlan>> {
+ let new_inputs = plan
+ .inputs()
+ .into_iter()
+ .map(|input| {
+ self.try_optimize(input, _optimizer_config)
+ .map(|o| o.unwrap_or_else(|| input.clone()))
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ let mut schema = new_inputs.iter().map(|input| input.schema()).fold(
+ DFSchema::empty(),
+ |mut lhs, rhs| {
+ lhs.merge(rhs);
+ lhs
+ },
+ );
-fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
- let new_inputs = plan
- .inputs()
- .iter()
- .map(|input| optimize(input))
- .collect::<Result<Vec<_>>>()?;
-
- let mut schema = new_inputs.iter().map(|input| input.schema()).fold(
- DFSchema::empty(),
- |mut lhs, rhs| {
- lhs.merge(rhs);
- lhs
- },
- );
-
- schema.merge(plan.schema());
-
- let mut expr_rewriter = UnwrapCastExprRewriter {
- schema: Arc::new(schema),
- };
+ schema.merge(plan.schema());
+
+ let mut expr_rewriter = UnwrapCastExprRewriter {
+ schema: Arc::new(schema),
+ };
- let new_exprs = plan
- .expressions()
- .into_iter()
- .map(|expr| rewrite_preserving_name(expr, &mut expr_rewriter))
- .collect::<Result<Vec<_>>>()?;
+ let new_exprs = plan
+ .expressions()
+ .into_iter()
+ .map(|expr| rewrite_preserving_name(expr, &mut expr_rewriter))
+ .collect::<Result<Vec<_>>>()?;
+
+ Ok(Some(from_plan(
+ plan,
+ new_exprs.as_slice(),
+ new_inputs.as_slice(),
+ )?))
+ }
- from_plan(plan, new_exprs.as_slice(), new_inputs.as_slice())
+ fn name(&self) -> &str {
+ "unwrap_cast_in_comparison"
+ }
}
struct UnwrapCastExprRewriter {