You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2022/12/14 17:38:21 UTC

[arrow-datafusion] branch master updated: add `try_optimize()` for all rules. (#4599)

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 508ba80f4 add `try_optimize()` for all rules. (#4599)
508ba80f4 is described below

commit 508ba80f417d671038f0847e116324baa0ec1a5b
Author: jakevin <ja...@gmail.com>
AuthorDate: Thu Dec 15 01:38:15 2022 +0800

    add `try_optimize()` for all rules. (#4599)
---
 .../optimizer/src/common_subexpr_eliminate.rs      | 58 +++++++++++------
 .../optimizer/src/decorrelate_where_exists.rs      | 14 ++--
 datafusion/optimizer/src/decorrelate_where_in.rs   | 34 +++++++---
 datafusion/optimizer/src/eliminate_cross_join.rs   | 35 +++++++---
 datafusion/optimizer/src/eliminate_filter.rs       | 22 +++++--
 datafusion/optimizer/src/eliminate_limit.rs        | 26 ++++++--
 datafusion/optimizer/src/eliminate_outer_join.rs   | 28 +++++++-
 datafusion/optimizer/src/filter_null_join_keys.rs  | 28 ++++++--
 datafusion/optimizer/src/inline_table_scan.rs      | 20 +++++-
 .../optimizer/src/propagate_empty_relation.rs      | 54 +++++++++-------
 datafusion/optimizer/src/push_down_filter.rs       | 40 ++++++++++--
 datafusion/optimizer/src/push_down_limit.rs        | 34 ++++++++--
 datafusion/optimizer/src/push_down_projection.rs   | 18 +++++-
 .../optimizer/src/rewrite_disjunctive_predicate.rs | 20 +++++-
 .../optimizer/src/scalar_subquery_to_join.rs       | 26 ++++++--
 .../optimizer/src/single_distinct_to_groupby.rs    | 36 ++++++++---
 .../optimizer/src/subquery_filter_to_join.rs       | 28 ++++++--
 datafusion/optimizer/src/type_coercion.rs          | 14 +++-
 .../optimizer/src/unwrap_cast_in_comparison.rs     | 75 +++++++++++++---------
 19 files changed, 460 insertions(+), 150 deletions(-)

diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index 482298e16..b541aa0ee 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -79,7 +79,9 @@ impl CommonSubexprEliminate {
             })
             .collect::<Result<Vec<_>>>()?;
 
-        let mut new_input = self.optimize(input, optimizer_config)?;
+        let mut new_input = self
+            .try_optimize(input, optimizer_config)?
+            .unwrap_or_else(|| input.clone());
         if !affected_id.is_empty() {
             new_input = build_project_plan(new_input, affected_id, expr_set)?;
         }
@@ -94,6 +96,16 @@ impl OptimizerRule for CommonSubexprEliminate {
         plan: &LogicalPlan,
         optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
+        Ok(self
+            .try_optimize(plan, optimizer_config)?
+            .unwrap_or_else(|| plan.clone()))
+    }
+
+    fn try_optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &mut OptimizerConfig,
+    ) -> Result<Option<LogicalPlan>> {
         let mut expr_set = ExprSet::new();
 
         match plan {
@@ -113,11 +125,13 @@ impl OptimizerRule for CommonSubexprEliminate {
                     optimizer_config,
                 )?;
 
-                Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
-                    pop_expr(&mut new_expr)?,
-                    Arc::new(new_input),
-                    schema.clone(),
-                )?))
+                Ok(Some(LogicalPlan::Projection(
+                    Projection::try_new_with_schema(
+                        pop_expr(&mut new_expr)?,
+                        Arc::new(new_input),
+                        schema.clone(),
+                    )?,
+                )))
             }
             LogicalPlan::Filter(filter) => {
                 let input = filter.input();
@@ -140,10 +154,10 @@ impl OptimizerRule for CommonSubexprEliminate {
                 )?;
 
                 if let Some(predicate) = pop_expr(&mut new_expr)?.pop() {
-                    Ok(LogicalPlan::Filter(Filter::try_new(
+                    Ok(Some(LogicalPlan::Filter(Filter::try_new(
                         predicate,
                         Arc::new(new_input),
-                    )?))
+                    )?)))
                 } else {
                     Err(DataFusionError::Internal(
                         "Failed to pop predicate expr".to_string(),
@@ -166,11 +180,11 @@ impl OptimizerRule for CommonSubexprEliminate {
                     optimizer_config,
                 )?;
 
-                Ok(LogicalPlan::Window(Window {
+                Ok(Some(LogicalPlan::Window(Window {
                     input: Arc::new(new_input),
                     window_expr: pop_expr(&mut new_expr)?,
                     schema: schema.clone(),
-                }))
+                })))
             }
             LogicalPlan::Aggregate(Aggregate {
                 group_expr,
@@ -194,12 +208,14 @@ impl OptimizerRule for CommonSubexprEliminate {
                 let new_aggr_expr = pop_expr(&mut new_expr)?;
                 let new_group_expr = pop_expr(&mut new_expr)?;
 
-                Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
-                    Arc::new(new_input),
-                    new_group_expr,
-                    new_aggr_expr,
-                    schema.clone(),
-                )?))
+                Ok(Some(LogicalPlan::Aggregate(
+                    Aggregate::try_new_with_schema(
+                        Arc::new(new_input),
+                        new_group_expr,
+                        new_aggr_expr,
+                        schema.clone(),
+                    )?,
+                )))
             }
             LogicalPlan::Sort(Sort { expr, input, fetch }) => {
                 let input_schema = Arc::clone(input.schema());
@@ -213,11 +229,11 @@ impl OptimizerRule for CommonSubexprEliminate {
                     optimizer_config,
                 )?;
 
-                Ok(LogicalPlan::Sort(Sort {
+                Ok(Some(LogicalPlan::Sort(Sort {
                     expr: pop_expr(&mut new_expr)?,
                     input: Arc::new(new_input),
                     fetch: *fetch,
-                }))
+                })))
             }
             LogicalPlan::Join(_)
             | LogicalPlan::CrossJoin(_)
@@ -243,7 +259,11 @@ impl OptimizerRule for CommonSubexprEliminate {
             | LogicalPlan::Extension(_)
             | LogicalPlan::Prepare(_) => {
                 // apply the optimization to all inputs of the plan
-                utils::optimize_children(self, plan, optimizer_config)
+                Ok(Some(utils::optimize_children(
+                    self,
+                    plan,
+                    optimizer_config,
+                )?))
             }
         }
     }
diff --git a/datafusion/optimizer/src/decorrelate_where_exists.rs b/datafusion/optimizer/src/decorrelate_where_exists.rs
index 9cf9138bc..6b5bd184f 100644
--- a/datafusion/optimizer/src/decorrelate_where_exists.rs
+++ b/datafusion/optimizer/src/decorrelate_where_exists.rs
@@ -57,8 +57,10 @@ impl DecorrelateWhereExists {
         for it in filters.iter() {
             match it {
                 Expr::Exists { subquery, negated } => {
-                    let subquery = self.optimize(&subquery.subquery, optimizer_config)?;
-                    let subquery = Arc::new(subquery);
+                    let subquery = self
+                        .try_optimize(&subquery.subquery, optimizer_config)?
+                        .map(Arc::new)
+                        .unwrap_or_else(|| subquery.subquery.clone());
                     let subquery = Subquery { subquery };
                     let subquery = SubqueryInfo::new(subquery.clone(), *negated);
                     subqueries.push(subquery);
@@ -90,10 +92,12 @@ impl OptimizerRule for DecorrelateWhereExists {
         match plan {
             LogicalPlan::Filter(filter) => {
                 let predicate = filter.predicate();
-                let filter_input = filter.input();
+                let filter_input = filter.input().as_ref();
 
                 // Apply optimizer rule to current input
-                let optimized_input = self.optimize(filter_input, optimizer_config)?;
+                let optimized_input = self
+                    .try_optimize(filter_input, optimizer_config)?
+                    .unwrap_or_else(|| filter_input.clone());
 
                 let (subqueries, other_exprs) =
                     self.extract_subquery_exprs(predicate, optimizer_config)?;
@@ -107,7 +111,7 @@ impl OptimizerRule for DecorrelateWhereExists {
                 }
 
                 // iterate through all exists clauses in predicate, turning each into a join
-                let mut cur_input = (**filter_input).clone();
+                let mut cur_input = filter_input.clone();
                 for subquery in subqueries {
                     if let Some(x) = optimize_exists(&subquery, &cur_input, &other_exprs)?
                     {
diff --git a/datafusion/optimizer/src/decorrelate_where_in.rs b/datafusion/optimizer/src/decorrelate_where_in.rs
index 1818e5897..4614b0699 100644
--- a/datafusion/optimizer/src/decorrelate_where_in.rs
+++ b/datafusion/optimizer/src/decorrelate_where_in.rs
@@ -59,8 +59,10 @@ impl DecorrelateWhereIn {
                     subquery,
                     negated,
                 } => {
-                    let subquery = self.optimize(&subquery.subquery, optimizer_config)?;
-                    let subquery = Arc::new(subquery);
+                    let subquery = self
+                        .try_optimize(&subquery.subquery, optimizer_config)?
+                        .map(Arc::new)
+                        .unwrap_or_else(|| subquery.subquery.clone());
                     let subquery = Subquery { subquery };
                     let subquery =
                         SubqueryInfo::new(subquery.clone(), (**expr).clone(), *negated);
@@ -81,13 +83,25 @@ impl OptimizerRule for DecorrelateWhereIn {
         plan: &LogicalPlan,
         optimizer_config: &mut OptimizerConfig,
     ) -> datafusion_common::Result<LogicalPlan> {
+        Ok(self
+            .try_optimize(plan, optimizer_config)?
+            .unwrap_or_else(|| plan.clone()))
+    }
+
+    fn try_optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &mut OptimizerConfig,
+    ) -> datafusion_common::Result<Option<LogicalPlan>> {
         match plan {
             LogicalPlan::Filter(filter) => {
                 let predicate = filter.predicate();
-                let filter_input = filter.input();
+                let filter_input = filter.input().as_ref();
 
                 // Apply optimizer rule to current input
-                let optimized_input = self.optimize(filter_input, optimizer_config)?;
+                let optimized_input = self
+                    .try_optimize(filter_input, optimizer_config)?
+                    .unwrap_or_else(|| filter_input.clone());
 
                 let (subqueries, other_exprs) =
                     self.extract_subquery_exprs(predicate, optimizer_config)?;
@@ -97,11 +111,11 @@ impl OptimizerRule for DecorrelateWhereIn {
                 )?);
                 if subqueries.is_empty() {
                     // regular filter, no subquery exists clause here
-                    return Ok(optimized_plan);
+                    return Ok(Some(optimized_plan));
                 }
 
                 // iterate through all exists clauses in predicate, turning each into a join
-                let mut cur_input = (**filter_input).clone();
+                let mut cur_input = filter_input.clone();
                 for subquery in subqueries {
                     cur_input = optimize_where_in(
                         &subquery,
@@ -110,11 +124,15 @@ impl OptimizerRule for DecorrelateWhereIn {
                         optimizer_config,
                     )?;
                 }
-                Ok(cur_input)
+                Ok(Some(cur_input))
             }
             _ => {
                 // Apply the optimization to all inputs of the plan
-                utils::optimize_children(self, plan, optimizer_config)
+                Ok(Some(utils::optimize_children(
+                    self,
+                    plan,
+                    optimizer_config,
+                )?))
             }
         }
     }
diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs b/datafusion/optimizer/src/eliminate_cross_join.rs
index 8ca457771..b431742f0 100644
--- a/datafusion/optimizer/src/eliminate_cross_join.rs
+++ b/datafusion/optimizer/src/eliminate_cross_join.rs
@@ -56,6 +56,16 @@ impl OptimizerRule for EliminateCrossJoin {
         plan: &LogicalPlan,
         _optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
+        Ok(self
+            .try_optimize(plan, _optimizer_config)?
+            .unwrap_or_else(|| plan.clone()))
+    }
+
+    fn try_optimize(
+        &self,
+        plan: &LogicalPlan,
+        _optimizer_config: &mut OptimizerConfig,
+    ) -> Result<Option<LogicalPlan>> {
         match plan {
             LogicalPlan::Filter(filter) => {
                 let input = (**filter.input()).clone();
@@ -78,7 +88,11 @@ impl OptimizerRule for EliminateCrossJoin {
                         )?;
                     }
                     _ => {
-                        return utils::optimize_children(self, plan, _optimizer_config);
+                        return Ok(Some(utils::optimize_children(
+                            self,
+                            plan,
+                            _optimizer_config,
+                        )?));
                     }
                 }
 
@@ -109,23 +123,26 @@ impl OptimizerRule for EliminateCrossJoin {
 
                 // if there are no join keys then do nothing.
                 if all_join_keys.is_empty() {
-                    Ok(LogicalPlan::Filter(Filter::try_new(
+                    Ok(Some(LogicalPlan::Filter(Filter::try_new(
                         predicate.clone(),
                         Arc::new(left),
-                    )?))
+                    )?)))
                 } else {
                     // remove join expressions from filter
                     match remove_join_expressions(predicate, &all_join_keys)? {
-                        Some(filter_expr) => Ok(LogicalPlan::Filter(Filter::try_new(
-                            filter_expr,
-                            Arc::new(left),
-                        )?)),
-                        _ => Ok(left),
+                        Some(filter_expr) => Ok(Some(LogicalPlan::Filter(
+                            Filter::try_new(filter_expr, Arc::new(left))?,
+                        ))),
+                        _ => Ok(Some(left)),
                     }
                 }
             }
 
-            _ => utils::optimize_children(self, plan, _optimizer_config),
+            _ => Ok(Some(utils::optimize_children(
+                self,
+                plan,
+                _optimizer_config,
+            )?)),
         }
     }
 
diff --git a/datafusion/optimizer/src/eliminate_filter.rs b/datafusion/optimizer/src/eliminate_filter.rs
index d5cbcf394..3e1c91a70 100644
--- a/datafusion/optimizer/src/eliminate_filter.rs
+++ b/datafusion/optimizer/src/eliminate_filter.rs
@@ -42,6 +42,16 @@ impl OptimizerRule for EliminateFilter {
         plan: &LogicalPlan,
         _optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
+        Ok(self
+            .try_optimize(plan, _optimizer_config)?
+            .unwrap_or_else(|| plan.clone()))
+    }
+
+    fn try_optimize(
+        &self,
+        plan: &LogicalPlan,
+        _optimizer_config: &mut OptimizerConfig,
+    ) -> Result<Option<LogicalPlan>> {
         let predicate_and_input = match plan {
             LogicalPlan::Filter(filter) => match filter.predicate() {
                 Expr::Literal(ScalarValue::Boolean(Some(v))) => {
@@ -53,14 +63,18 @@ impl OptimizerRule for EliminateFilter {
         };
 
         match predicate_and_input {
-            Some((true, input)) => self.optimize(input, _optimizer_config),
-            Some((false, input)) => Ok(LogicalPlan::EmptyRelation(EmptyRelation {
+            Some((true, input)) => self.try_optimize(input, _optimizer_config),
+            Some((false, input)) => Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
                 produce_one_row: false,
                 schema: input.schema().clone(),
-            })),
+            }))),
             None => {
                 // Apply the optimization to all inputs of the plan
-                utils::optimize_children(self, plan, _optimizer_config)
+                Ok(Some(utils::optimize_children(
+                    self,
+                    plan,
+                    _optimizer_config,
+                )?))
             }
         }
     }
diff --git a/datafusion/optimizer/src/eliminate_limit.rs b/datafusion/optimizer/src/eliminate_limit.rs
index 9ad39d1db..b60b16e83 100644
--- a/datafusion/optimizer/src/eliminate_limit.rs
+++ b/datafusion/optimizer/src/eliminate_limit.rs
@@ -42,25 +42,43 @@ impl OptimizerRule for EliminateLimit {
         plan: &LogicalPlan,
         optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
+        Ok(self
+            .try_optimize(plan, optimizer_config)?
+            .unwrap_or_else(|| plan.clone()))
+    }
+
+    fn try_optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &mut OptimizerConfig,
+    ) -> Result<Option<LogicalPlan>> {
         if let LogicalPlan::Limit(limit) = plan {
             match limit.fetch {
                 Some(fetch) => {
                     if fetch == 0 {
-                        return Ok(LogicalPlan::EmptyRelation(EmptyRelation {
+                        return Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
                             produce_one_row: false,
                             schema: limit.input.schema().clone(),
-                        }));
+                        })));
                     }
                 }
                 None => {
                     if limit.skip == 0 {
                         let input = &*limit.input;
-                        return utils::optimize_children(self, input, optimizer_config);
+                        return Ok(Some(utils::optimize_children(
+                            self,
+                            input,
+                            optimizer_config,
+                        )?));
                     }
                 }
             }
         }
-        utils::optimize_children(self, plan, optimizer_config)
+        Ok(Some(utils::optimize_children(
+            self,
+            plan,
+            optimizer_config,
+        )?))
     }
 
     fn name(&self) -> &str {
diff --git a/datafusion/optimizer/src/eliminate_outer_join.rs b/datafusion/optimizer/src/eliminate_outer_join.rs
index 9be661624..4d1fecad7 100644
--- a/datafusion/optimizer/src/eliminate_outer_join.rs
+++ b/datafusion/optimizer/src/eliminate_outer_join.rs
@@ -67,6 +67,16 @@ impl OptimizerRule for EliminateOuterJoin {
         plan: &LogicalPlan,
         optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
+        Ok(self
+            .try_optimize(plan, optimizer_config)?
+            .unwrap_or_else(|| plan.clone()))
+    }
+
+    fn try_optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &mut OptimizerConfig,
+    ) -> Result<Option<LogicalPlan>> {
         match plan {
             LogicalPlan::Filter(filter) => match filter.input().as_ref() {
                 LogicalPlan::Join(join) => {
@@ -110,11 +120,23 @@ impl OptimizerRule for EliminateOuterJoin {
                         null_equals_null: join.null_equals_null,
                     });
                     let new_plan = from_plan(plan, &plan.expressions(), &[new_join])?;
-                    utils::optimize_children(self, &new_plan, optimizer_config)
+                    Ok(Some(utils::optimize_children(
+                        self,
+                        &new_plan,
+                        optimizer_config,
+                    )?))
                 }
-                _ => utils::optimize_children(self, plan, optimizer_config),
+                _ => Ok(Some(utils::optimize_children(
+                    self,
+                    plan,
+                    optimizer_config,
+                )?)),
             },
-            _ => utils::optimize_children(self, plan, optimizer_config),
+            _ => Ok(Some(utils::optimize_children(
+                self,
+                plan,
+                optimizer_config,
+            )?)),
         }
     }
 
diff --git a/datafusion/optimizer/src/filter_null_join_keys.rs b/datafusion/optimizer/src/filter_null_join_keys.rs
index be33c796e..f1a4f1393 100644
--- a/datafusion/optimizer/src/filter_null_join_keys.rs
+++ b/datafusion/optimizer/src/filter_null_join_keys.rs
@@ -40,12 +40,28 @@ impl OptimizerRule for FilterNullJoinKeys {
         plan: &LogicalPlan,
         optimizer_config: &mut OptimizerConfig,
     ) -> datafusion_common::Result<LogicalPlan> {
+        Ok(self
+            .try_optimize(plan, optimizer_config)?
+            .unwrap_or_else(|| plan.clone()))
+    }
+
+    fn try_optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &mut OptimizerConfig,
+    ) -> datafusion_common::Result<Option<LogicalPlan>> {
         match plan {
             LogicalPlan::Join(join) if join.join_type == JoinType::Inner => {
                 // recurse down first and optimize inputs
                 let mut join = join.clone();
-                join.left = Arc::new(self.optimize(&join.left, optimizer_config)?);
-                join.right = Arc::new(self.optimize(&join.right, optimizer_config)?);
+                join.left = Arc::new(
+                    self.try_optimize(&join.left, optimizer_config)?
+                        .unwrap_or_else(|| join.left.as_ref().clone()),
+                );
+                join.right = Arc::new(
+                    self.try_optimize(&join.right, optimizer_config)?
+                        .unwrap_or_else(|| join.right.as_ref().clone()),
+                );
 
                 let left_schema = join.left.schema();
                 let right_schema = join.right.schema();
@@ -80,11 +96,15 @@ impl OptimizerRule for FilterNullJoinKeys {
                         join.right.clone(),
                     )?));
                 }
-                Ok(LogicalPlan::Join(join))
+                Ok(Some(LogicalPlan::Join(join)))
             }
             _ => {
                 // Apply the optimization to all inputs of the plan
-                utils::optimize_children(self, plan, optimizer_config)
+                Ok(Some(utils::optimize_children(
+                    self,
+                    plan,
+                    optimizer_config,
+                )?))
             }
         }
     }
diff --git a/datafusion/optimizer/src/inline_table_scan.rs b/datafusion/optimizer/src/inline_table_scan.rs
index 7fdb594be..e0238a751 100644
--- a/datafusion/optimizer/src/inline_table_scan.rs
+++ b/datafusion/optimizer/src/inline_table_scan.rs
@@ -40,6 +40,16 @@ impl OptimizerRule for InlineTableScan {
         plan: &LogicalPlan,
         _optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
+        Ok(self
+            .try_optimize(plan, _optimizer_config)?
+            .unwrap_or_else(|| plan.clone()))
+    }
+
+    fn try_optimize(
+        &self,
+        plan: &LogicalPlan,
+        _optimizer_config: &mut OptimizerConfig,
+    ) -> Result<Option<LogicalPlan>> {
         match plan {
             // Match only on scans without filter / projection / fetch
             // Views and DataFrames won't have those added
@@ -57,17 +67,21 @@ impl OptimizerRule for InlineTableScan {
                     let plan = LogicalPlanBuilder::from(plan)
                         .project(vec![Expr::Wildcard])?
                         .alias(table_name)?;
-                    plan.build()
+                    Ok(Some(plan.build()?))
                 } else {
                     // No plan available, return with table scan as is
-                    Ok(plan.clone())
+                    Ok(Some(plan.clone()))
                 }
             }
 
             // Rest: Recurse
             _ => {
                 // apply the optimization to all inputs of the plan
-                utils::optimize_children(self, plan, _optimizer_config)
+                Ok(Some(utils::optimize_children(
+                    self,
+                    plan,
+                    _optimizer_config,
+                )?))
             }
         }
     }
diff --git a/datafusion/optimizer/src/propagate_empty_relation.rs b/datafusion/optimizer/src/propagate_empty_relation.rs
index 2de7e72ef..cfdb354d4 100644
--- a/datafusion/optimizer/src/propagate_empty_relation.rs
+++ b/datafusion/optimizer/src/propagate_empty_relation.rs
@@ -39,11 +39,21 @@ impl OptimizerRule for PropagateEmptyRelation {
         plan: &LogicalPlan,
         optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
+        Ok(self
+            .try_optimize(plan, optimizer_config)?
+            .unwrap_or_else(|| plan.clone()))
+    }
+
+    fn try_optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &mut OptimizerConfig,
+    ) -> Result<Option<LogicalPlan>> {
         // optimize child plans first
         let optimized_children_plan =
             utils::optimize_children(self, plan, optimizer_config)?;
         match &optimized_children_plan {
-            LogicalPlan::EmptyRelation(_) => Ok(optimized_children_plan),
+            LogicalPlan::EmptyRelation(_) => Ok(Some(optimized_children_plan)),
             LogicalPlan::Projection(_)
             | LogicalPlan::Filter(_)
             | LogicalPlan::Window(_)
@@ -51,19 +61,19 @@ impl OptimizerRule for PropagateEmptyRelation {
             | LogicalPlan::SubqueryAlias(_)
             | LogicalPlan::Repartition(_)
             | LogicalPlan::Limit(_) => match empty_child(&optimized_children_plan)? {
-                Some(empty) => Ok(empty),
-                None => Ok(optimized_children_plan),
+                Some(empty) => Ok(Some(empty)),
+                None => Ok(Some(optimized_children_plan)),
             },
             LogicalPlan::CrossJoin(_) => {
                 let (left_empty, right_empty) =
                     binary_plan_children_is_empty(&optimized_children_plan)?;
                 if left_empty || right_empty {
-                    Ok(LogicalPlan::EmptyRelation(EmptyRelation {
+                    Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
                         produce_one_row: false,
                         schema: optimized_children_plan.schema().clone(),
-                    }))
+                    })))
                 } else {
-                    Ok(optimized_children_plan)
+                    Ok(Some(optimized_children_plan))
                 }
             }
             LogicalPlan::Join(join) => {
@@ -83,15 +93,15 @@ impl OptimizerRule for PropagateEmptyRelation {
                     let (left_empty, right_empty) =
                         binary_plan_children_is_empty(&optimized_children_plan)?;
                     if left_empty || right_empty {
-                        Ok(LogicalPlan::EmptyRelation(EmptyRelation {
+                        Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
                             produce_one_row: false,
                             schema: optimized_children_plan.schema().clone(),
-                        }))
+                        })))
                     } else {
-                        Ok(optimized_children_plan)
+                        Ok(Some(optimized_children_plan))
                     }
                 } else {
-                    Ok(optimized_children_plan)
+                    Ok(Some(optimized_children_plan))
                 }
             }
             LogicalPlan::Union(union) => {
@@ -106,40 +116,40 @@ impl OptimizerRule for PropagateEmptyRelation {
                     .collect::<Vec<_>>();
 
                 if new_inputs.len() == union.inputs.len() {
-                    Ok(optimized_children_plan)
+                    Ok(Some(optimized_children_plan))
                 } else if new_inputs.is_empty() {
-                    Ok(LogicalPlan::EmptyRelation(EmptyRelation {
+                    Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
                         produce_one_row: false,
                         schema: optimized_children_plan.schema().clone(),
-                    }))
+                    })))
                 } else if new_inputs.len() == 1 {
                     let child = (**(union.inputs.get(0).unwrap())).clone();
                     if child.schema().eq(optimized_children_plan.schema()) {
-                        Ok(child)
+                        Ok(Some(child))
                     } else {
-                        Ok(LogicalPlan::Projection(Projection::new_from_schema(
+                        Ok(Some(LogicalPlan::Projection(Projection::new_from_schema(
                             Arc::new(child),
                             optimized_children_plan.schema().clone(),
-                        )))
+                        ))))
                     }
                 } else {
-                    Ok(LogicalPlan::Union(Union {
+                    Ok(Some(LogicalPlan::Union(Union {
                         inputs: new_inputs,
                         schema: union.schema.clone(),
-                    }))
+                    })))
                 }
             }
             LogicalPlan::Aggregate(agg) => {
                 if !agg.group_expr.is_empty() {
                     match empty_child(&optimized_children_plan)? {
-                        Some(empty) => Ok(empty),
-                        None => Ok(optimized_children_plan),
+                        Some(empty) => Ok(Some(empty)),
+                        None => Ok(Some(optimized_children_plan)),
                     }
                 } else {
-                    Ok(optimized_children_plan)
+                    Ok(Some(optimized_children_plan))
                 }
             }
-            _ => Ok(optimized_children_plan),
+            _ => Ok(Some(optimized_children_plan)),
         }
     }
 
diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs
index 61051a5a5..40ed00ebd 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -508,19 +508,41 @@ impl OptimizerRule for PushDownFilter {
         plan: &LogicalPlan,
         optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
+        Ok(self
+            .try_optimize(plan, optimizer_config)?
+            .unwrap_or_else(|| plan.clone()))
+    }
+
+    fn try_optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &mut OptimizerConfig,
+    ) -> Result<Option<LogicalPlan>> {
         let filter = match plan {
             LogicalPlan::Filter(filter) => filter,
             // we also need to pushdown filter in Join.
             LogicalPlan::Join(join) => {
                 let optimized_plan = push_down_join(plan, join, None)?;
                 return match optimized_plan {
-                    Some(optimized_plan) => {
-                        utils::optimize_children(self, &optimized_plan, optimizer_config)
-                    }
-                    None => utils::optimize_children(self, plan, optimizer_config),
+                    Some(optimized_plan) => Ok(Some(utils::optimize_children(
+                        self,
+                        &optimized_plan,
+                        optimizer_config,
+                    )?)),
+                    None => Ok(Some(utils::optimize_children(
+                        self,
+                        plan,
+                        optimizer_config,
+                    )?)),
                 };
             }
-            _ => return utils::optimize_children(self, plan, optimizer_config),
+            _ => {
+                return Ok(Some(utils::optimize_children(
+                    self,
+                    plan,
+                    optimizer_config,
+                )?))
+            }
         };
 
         let child_plan = &**filter.input();
@@ -532,7 +554,7 @@ impl OptimizerRule for PushDownFilter {
                     new_predicate,
                     child_filter.input().clone(),
                 )?);
-                return self.optimize(&new_plan, optimizer_config);
+                return self.try_optimize(&new_plan, optimizer_config);
             }
             LogicalPlan::Repartition(_)
             | LogicalPlan::Distinct(_)
@@ -733,7 +755,11 @@ impl OptimizerRule for PushDownFilter {
             _ => plan.clone(),
         };
 
-        utils::optimize_children(self, &new_plan, optimizer_config)
+        Ok(Some(utils::optimize_children(
+            self,
+            &new_plan,
+            optimizer_config,
+        )?))
     }
 }
 
diff --git a/datafusion/optimizer/src/push_down_limit.rs b/datafusion/optimizer/src/push_down_limit.rs
index 3ce4f441f..a404762b6 100644
--- a/datafusion/optimizer/src/push_down_limit.rs
+++ b/datafusion/optimizer/src/push_down_limit.rs
@@ -80,9 +80,25 @@ impl OptimizerRule for PushDownLimit {
         plan: &LogicalPlan,
         optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
+        Ok(self
+            .try_optimize(plan, optimizer_config)?
+            .unwrap_or_else(|| plan.clone()))
+    }
+
+    fn try_optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &mut OptimizerConfig,
+    ) -> Result<Option<LogicalPlan>> {
         let limit = match plan {
             LogicalPlan::Limit(limit) => limit,
-            _ => return utils::optimize_children(self, plan, optimizer_config),
+            _ => {
+                return Ok(Some(utils::optimize_children(
+                    self,
+                    plan,
+                    optimizer_config,
+                )?))
+            }
         };
 
         if let LogicalPlan::Limit(child_limit) = &*limit.input {
@@ -112,12 +128,18 @@ impl OptimizerRule for PushDownLimit {
                 fetch: new_fetch,
                 input: Arc::new((*child_limit.input).clone()),
             });
-            return self.optimize(&plan, optimizer_config);
+            return self.try_optimize(&plan, optimizer_config);
         }
 
         let fetch = match limit.fetch {
             Some(fetch) => fetch,
-            None => return utils::optimize_children(self, plan, optimizer_config),
+            None => {
+                return Ok(Some(utils::optimize_children(
+                    self,
+                    plan,
+                    optimizer_config,
+                )?))
+            }
         };
         let skip = limit.skip;
 
@@ -225,7 +247,11 @@ impl OptimizerRule for PushDownLimit {
             _ => plan.clone(),
         };
 
-        utils::optimize_children(self, &plan, optimizer_config)
+        Ok(Some(utils::optimize_children(
+            self,
+            &plan,
+            optimizer_config,
+        )?))
     }
 
     fn name(&self) -> &str {
diff --git a/datafusion/optimizer/src/push_down_projection.rs b/datafusion/optimizer/src/push_down_projection.rs
index 2d156d1ce..0238148c3 100644
--- a/datafusion/optimizer/src/push_down_projection.rs
+++ b/datafusion/optimizer/src/push_down_projection.rs
@@ -51,6 +51,16 @@ impl OptimizerRule for PushDownProjection {
         plan: &LogicalPlan,
         optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
+        Ok(self
+            .try_optimize(plan, optimizer_config)?
+            .unwrap_or_else(|| plan.clone()))
+    }
+
+    fn try_optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &mut OptimizerConfig,
+    ) -> Result<Option<LogicalPlan>> {
         // set of all columns referred by the plan (and thus considered required by the root)
         let required_columns = plan
             .schema()
@@ -58,7 +68,13 @@ impl OptimizerRule for PushDownProjection {
             .iter()
             .map(|f| f.qualified_column())
             .collect::<HashSet<Column>>();
-        optimize_plan(self, plan, &required_columns, false, optimizer_config)
+        Ok(Some(optimize_plan(
+            self,
+            plan,
+            &required_columns,
+            false,
+            optimizer_config,
+        )?))
     }
 
     fn name(&self) -> &str {
diff --git a/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs b/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs
index 6a2ba2bde..3f6cc763b 100644
--- a/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs
+++ b/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs
@@ -129,17 +129,31 @@ impl OptimizerRule for RewriteDisjunctivePredicate {
         plan: &LogicalPlan,
         _optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
+        Ok(self
+            .try_optimize(plan, _optimizer_config)?
+            .unwrap_or_else(|| plan.clone()))
+    }
+
+    fn try_optimize(
+        &self,
+        plan: &LogicalPlan,
+        _optimizer_config: &mut OptimizerConfig,
+    ) -> Result<Option<LogicalPlan>> {
         match plan {
             LogicalPlan::Filter(filter) => {
                 let predicate = predicate(filter.predicate())?;
                 let rewritten_predicate = rewrite_predicate(predicate);
                 let rewritten_expr = normalize_predicate(rewritten_predicate);
-                Ok(LogicalPlan::Filter(Filter::try_new(
+                Ok(Some(LogicalPlan::Filter(Filter::try_new(
                     rewritten_expr,
                     Arc::new(Self::optimize(self, filter.input(), _optimizer_config)?),
-                )?))
+                )?)))
             }
-            _ => utils::optimize_children(self, plan, _optimizer_config),
+            _ => Ok(Some(utils::optimize_children(
+                self,
+                plan,
+                _optimizer_config,
+            )?)),
         }
     }
 
diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs
index f5a35c625..00e4d89c8 100644
--- a/datafusion/optimizer/src/scalar_subquery_to_join.rs
+++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs
@@ -94,6 +94,16 @@ impl OptimizerRule for ScalarSubqueryToJoin {
         plan: &LogicalPlan,
         optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
+        Ok(self
+            .try_optimize(plan, optimizer_config)?
+            .unwrap_or_else(|| plan.clone()))
+    }
+
+    fn try_optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &mut OptimizerConfig,
+    ) -> Result<Option<LogicalPlan>> {
         match plan {
             LogicalPlan::Filter(filter) => {
                 // Apply optimizer rule to current input
@@ -104,10 +114,10 @@ impl OptimizerRule for ScalarSubqueryToJoin {
 
                 if subqueries.is_empty() {
                     // regular filter, no subquery exists clause here
-                    return Ok(LogicalPlan::Filter(Filter::try_new(
+                    return Ok(Some(LogicalPlan::Filter(Filter::try_new(
                         filter.predicate().clone(),
                         Arc::new(optimized_input),
-                    )?));
+                    )?)));
                 }
 
                 // iterate through all subqueries in predicate, turning each into a join
@@ -122,17 +132,21 @@ impl OptimizerRule for ScalarSubqueryToJoin {
                         cur_input = optimized_subquery;
                     } else {
                         // if we can't handle all of the subqueries then bail for now
-                        return Ok(LogicalPlan::Filter(Filter::try_new(
+                        return Ok(Some(LogicalPlan::Filter(Filter::try_new(
                             filter.predicate().clone(),
                             Arc::new(optimized_input),
-                        )?));
+                        )?)));
                     }
                 }
-                Ok(cur_input)
+                Ok(Some(cur_input))
             }
             _ => {
                 // Apply the optimization to all inputs of the plan
-                utils::optimize_children(self, plan, optimizer_config)
+                Ok(Some(utils::optimize_children(
+                    self,
+                    plan,
+                    optimizer_config,
+                )?))
             }
         }
     }
diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs
index 13cf6d3a8..1e1322395 100644
--- a/datafusion/optimizer/src/single_distinct_to_groupby.rs
+++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs
@@ -86,8 +86,18 @@ impl OptimizerRule for SingleDistinctToGroupBy {
     fn optimize(
         &self,
         plan: &LogicalPlan,
-        _optimizer_config: &mut OptimizerConfig,
+        optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
+        Ok(self
+            .try_optimize(plan, optimizer_config)?
+            .unwrap_or_else(|| plan.clone()))
+    }
+
+    fn try_optimize(
+        &self,
+        plan: &LogicalPlan,
+        _optimizer_config: &mut OptimizerConfig,
+    ) -> Result<Option<LogicalPlan>> {
         match plan {
             LogicalPlan::Aggregate(Aggregate {
                 input,
@@ -192,16 +202,26 @@ impl OptimizerRule for SingleDistinctToGroupBy {
                         new_aggr_exprs,
                     )?);
 
-                    Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
-                        alias_expr,
-                        Arc::new(outer_aggr),
-                        schema.clone(),
-                    )?))
+                    Ok(Some(LogicalPlan::Projection(
+                        Projection::try_new_with_schema(
+                            alias_expr,
+                            Arc::new(outer_aggr),
+                            schema.clone(),
+                        )?,
+                    )))
                 } else {
-                    utils::optimize_children(self, plan, _optimizer_config)
+                    Ok(Some(utils::optimize_children(
+                        self,
+                        plan,
+                        _optimizer_config,
+                    )?))
                 }
             }
-            _ => utils::optimize_children(self, plan, _optimizer_config),
+            _ => Ok(Some(utils::optimize_children(
+                self,
+                plan,
+                _optimizer_config,
+            )?)),
         }
     }
     fn name(&self) -> &str {
diff --git a/datafusion/optimizer/src/subquery_filter_to_join.rs b/datafusion/optimizer/src/subquery_filter_to_join.rs
index c077ce7e8..ccf6931d3 100644
--- a/datafusion/optimizer/src/subquery_filter_to_join.rs
+++ b/datafusion/optimizer/src/subquery_filter_to_join.rs
@@ -54,6 +54,16 @@ impl OptimizerRule for SubqueryFilterToJoin {
         plan: &LogicalPlan,
         optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
+        Ok(self
+            .try_optimize(plan, optimizer_config)?
+            .unwrap_or_else(|| plan.clone()))
+    }
+
+    fn try_optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &mut OptimizerConfig,
+    ) -> Result<Option<LogicalPlan>> {
         match plan {
             LogicalPlan::Filter(filter) => {
                 // Apply optimizer rule to current input
@@ -78,10 +88,10 @@ impl OptimizerRule for SubqueryFilterToJoin {
                 })?;
 
                 if !subqueries_in_regular.is_empty() {
-                    return Ok(LogicalPlan::Filter(Filter::try_new(
+                    return Ok(Some(LogicalPlan::Filter(Filter::try_new(
                         filter.predicate().clone(),
                         Arc::new(optimized_input),
-                    )?));
+                    )?)));
                 };
 
                 // Add subquery joins to new_input
@@ -150,23 +160,27 @@ impl OptimizerRule for SubqueryFilterToJoin {
                 let new_input = match opt_result {
                     Ok(plan) => plan,
                     Err(_) => {
-                        return Ok(LogicalPlan::Filter(Filter::try_new(
+                        return Ok(Some(LogicalPlan::Filter(Filter::try_new(
                             filter.predicate().clone(),
                             Arc::new(optimized_input),
-                        )?))
+                        )?)))
                     }
                 };
 
                 // Apply regular filters to join output if some or just return join
                 if regular_filters.is_empty() {
-                    Ok(new_input)
+                    Ok(Some(new_input))
                 } else {
-                    utils::add_filter(new_input, &regular_filters)
+                    Ok(Some(utils::add_filter(new_input, &regular_filters)?))
                 }
             }
             _ => {
                 // Apply the optimization to all inputs of the plan
-                utils::optimize_children(self, plan, optimizer_config)
+                Ok(Some(utils::optimize_children(
+                    self,
+                    plan,
+                    optimizer_config,
+                )?))
             }
         }
     }
diff --git a/datafusion/optimizer/src/type_coercion.rs b/datafusion/optimizer/src/type_coercion.rs
index 698abff94..b3e1f4e75 100644
--- a/datafusion/optimizer/src/type_coercion.rs
+++ b/datafusion/optimizer/src/type_coercion.rs
@@ -58,9 +58,19 @@ impl OptimizerRule for TypeCoercion {
     fn optimize(
         &self,
         plan: &LogicalPlan,
-        _optimizer_config: &mut OptimizerConfig,
+        optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
-        optimize_internal(&DFSchema::empty(), plan)
+        Ok(self
+            .try_optimize(plan, optimizer_config)?
+            .unwrap_or_else(|| plan.clone()))
+    }
+
+    fn try_optimize(
+        &self,
+        plan: &LogicalPlan,
+        _optimizer_config: &mut OptimizerConfig,
+    ) -> Result<Option<LogicalPlan>> {
+        Ok(Some(optimize_internal(&DFSchema::empty(), plan)?))
     }
 }
 
diff --git a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
index 8e17c184e..bfcb0f85a 100644
--- a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
+++ b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
@@ -84,42 +84,55 @@ impl OptimizerRule for UnwrapCastInComparison {
         plan: &LogicalPlan,
         _optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
-        optimize(plan)
+        Ok(self
+            .try_optimize(plan, _optimizer_config)?
+            .unwrap_or_else(|| plan.clone()))
     }
 
-    fn name(&self) -> &str {
-        "unwrap_cast_in_comparison"
-    }
-}
+    fn try_optimize(
+        &self,
+        plan: &LogicalPlan,
+        _optimizer_config: &mut OptimizerConfig,
+    ) -> Result<Option<LogicalPlan>> {
+        let new_inputs = plan
+            .inputs()
+            .into_iter()
+            .map(|input| {
+                self.try_optimize(input, _optimizer_config)
+                    .map(|o| o.unwrap_or_else(|| input.clone()))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        let mut schema = new_inputs.iter().map(|input| input.schema()).fold(
+            DFSchema::empty(),
+            |mut lhs, rhs| {
+                lhs.merge(rhs);
+                lhs
+            },
+        );
 
-fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
-    let new_inputs = plan
-        .inputs()
-        .iter()
-        .map(|input| optimize(input))
-        .collect::<Result<Vec<_>>>()?;
-
-    let mut schema = new_inputs.iter().map(|input| input.schema()).fold(
-        DFSchema::empty(),
-        |mut lhs, rhs| {
-            lhs.merge(rhs);
-            lhs
-        },
-    );
-
-    schema.merge(plan.schema());
-
-    let mut expr_rewriter = UnwrapCastExprRewriter {
-        schema: Arc::new(schema),
-    };
+        schema.merge(plan.schema());
+
+        let mut expr_rewriter = UnwrapCastExprRewriter {
+            schema: Arc::new(schema),
+        };
 
-    let new_exprs = plan
-        .expressions()
-        .into_iter()
-        .map(|expr| rewrite_preserving_name(expr, &mut expr_rewriter))
-        .collect::<Result<Vec<_>>>()?;
+        let new_exprs = plan
+            .expressions()
+            .into_iter()
+            .map(|expr| rewrite_preserving_name(expr, &mut expr_rewriter))
+            .collect::<Result<Vec<_>>>()?;
+
+        Ok(Some(from_plan(
+            plan,
+            new_exprs.as_slice(),
+            new_inputs.as_slice(),
+        )?))
+    }
 
-    from_plan(plan, new_exprs.as_slice(), new_inputs.as_slice())
+    fn name(&self) -> &str {
+        "unwrap_cast_in_comparison"
+    }
 }
 
 struct UnwrapCastExprRewriter {