You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/01/14 17:23:31 UTC

[arrow-datafusion] branch master updated: Update `optimize_children` to return `Result>` (#4888)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new d37dccf6d Update  `optimize_children` to return `Result<Option<LogicalPlan>>` (#4888)
d37dccf6d is described below

commit d37dccf6d825762ba5d25a7ac1d10a1f0756e0a6
Author: Remzi Yang <59...@users.noreply.github.com>
AuthorDate: Sun Jan 15 01:23:25 2023 +0800

    Update  `optimize_children` to return `Result<Option<LogicalPlan>>` (#4888)
    
    * refactor optimize_children
    
    Signed-off-by: remzi <13...@gmail.com>
    
    * comment
    
    Signed-off-by: remzi <13...@gmail.com>
    
    * address comment
    
    Signed-off-by: remzi <13...@gmail.com>
    
    Signed-off-by: remzi <13...@gmail.com>
---
 datafusion-examples/examples/rewrite_expr.rs       | 21 +++++++++---
 datafusion/core/tests/user_defined_plan.rs         |  2 +-
 .../optimizer/src/common_subexpr_eliminate.rs      | 37 +++++++++++++---------
 datafusion/optimizer/src/eliminate_cross_join.rs   |  6 ++--
 datafusion/optimizer/src/push_down_filter.rs       | 18 ++++++-----
 datafusion/optimizer/src/utils.rs                  | 12 +++++--
 6 files changed, 62 insertions(+), 34 deletions(-)

diff --git a/datafusion-examples/examples/rewrite_expr.rs b/datafusion-examples/examples/rewrite_expr.rs
index b63f8b176..3e87dbe91 100644
--- a/datafusion-examples/examples/rewrite_expr.rs
+++ b/datafusion-examples/examples/rewrite_expr.rs
@@ -79,10 +79,9 @@ impl OptimizerRule for MyRule {
         config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
         // recurse down and optimize children first
-        let plan = utils::optimize_children(self, plan, config)?;
-
-        match plan {
-            LogicalPlan::Filter(filter) => {
+        let optimized_plan = utils::optimize_children(self, plan, config)?;
+        match optimized_plan {
+            Some(LogicalPlan::Filter(filter)) => {
                 let mut expr_rewriter = MyExprRewriter {};
                 let predicate = filter.predicate.clone();
                 let predicate = predicate.rewrite(&mut expr_rewriter)?;
@@ -91,7 +90,19 @@ impl OptimizerRule for MyRule {
                     filter.input,
                 )?)))
             }
-            _ => Ok(Some(plan.clone())),
+            Some(optimized_plan) => Ok(Some(optimized_plan)),
+            None => match plan {
+                LogicalPlan::Filter(filter) => {
+                    let mut expr_rewriter = MyExprRewriter {};
+                    let predicate = filter.predicate.clone();
+                    let predicate = predicate.rewrite(&mut expr_rewriter)?;
+                    Ok(Some(LogicalPlan::Filter(Filter::try_new(
+                        predicate,
+                        filter.input.clone(),
+                    )?)))
+                }
+                _ => Ok(None),
+            },
         }
     }
 }
diff --git a/datafusion/core/tests/user_defined_plan.rs b/datafusion/core/tests/user_defined_plan.rs
index dcd182808..235e1d83b 100644
--- a/datafusion/core/tests/user_defined_plan.rs
+++ b/datafusion/core/tests/user_defined_plan.rs
@@ -318,7 +318,7 @@ impl OptimizerRule for TopKOptimizerRule {
 
         // If we didn't find the Limit/Sort combination, recurse as
         // normal and build the result.
-        Ok(Some(optimize_children(self, plan, config)?))
+        optimize_children(self, plan, config)
     }
 
     fn name(&self) -> &str {
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index 0ac5dc69b..ba073c988 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -102,7 +102,7 @@ impl OptimizerRule for CommonSubexprEliminate {
         let mut expr_set = ExprSet::new();
 
         let original_schema = plan.schema().clone();
-        let mut optimized_plan = match plan {
+        let optimized_plan = match plan {
             LogicalPlan::Projection(Projection {
                 expr,
                 input,
@@ -115,11 +115,11 @@ impl OptimizerRule for CommonSubexprEliminate {
                 let (mut new_expr, new_input) =
                     self.rewrite_expr(&[expr], &[&arrays], input, &mut expr_set, config)?;
 
-                LogicalPlan::Projection(Projection::try_new_with_schema(
+                Some(LogicalPlan::Projection(Projection::try_new_with_schema(
                     pop_expr(&mut new_expr)?,
                     Arc::new(new_input),
                     schema.clone(),
-                )?)
+                )?))
             }
             LogicalPlan::Filter(filter) => {
                 let input = &filter.input;
@@ -142,7 +142,10 @@ impl OptimizerRule for CommonSubexprEliminate {
                 )?;
 
                 if let Some(predicate) = pop_expr(&mut new_expr)?.pop() {
-                    LogicalPlan::Filter(Filter::try_new(predicate, Arc::new(new_input))?)
+                    Some(LogicalPlan::Filter(Filter::try_new(
+                        predicate,
+                        Arc::new(new_input),
+                    )?))
                 } else {
                     return Err(DataFusionError::Internal(
                         "Failed to pop predicate expr".to_string(),
@@ -165,11 +168,11 @@ impl OptimizerRule for CommonSubexprEliminate {
                     config,
                 )?;
 
-                LogicalPlan::Window(Window {
+                Some(LogicalPlan::Window(Window {
                     input: Arc::new(new_input),
                     window_expr: pop_expr(&mut new_expr)?,
                     schema: schema.clone(),
-                })
+                }))
             }
             LogicalPlan::Aggregate(Aggregate {
                 group_expr,
@@ -194,12 +197,12 @@ impl OptimizerRule for CommonSubexprEliminate {
                 let new_aggr_expr = pop_expr(&mut new_expr)?;
                 let new_group_expr = pop_expr(&mut new_expr)?;
 
-                LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
+                Some(LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
                     Arc::new(new_input),
                     new_group_expr,
                     new_aggr_expr,
                     schema.clone(),
-                )?)
+                )?))
             }
             LogicalPlan::Sort(Sort { expr, input, fetch }) => {
                 let input_schema = Arc::clone(input.schema());
@@ -208,11 +211,11 @@ impl OptimizerRule for CommonSubexprEliminate {
                 let (mut new_expr, new_input) =
                     self.rewrite_expr(&[expr], &[&arrays], input, &mut expr_set, config)?;
 
-                LogicalPlan::Sort(Sort {
+                Some(LogicalPlan::Sort(Sort {
                     expr: pop_expr(&mut new_expr)?,
                     input: Arc::new(new_input),
                     fetch: *fetch,
-                })
+                }))
             }
             LogicalPlan::Join(_)
             | LogicalPlan::CrossJoin(_)
@@ -242,12 +245,16 @@ impl OptimizerRule for CommonSubexprEliminate {
             }
         };
 
-        // add an additional projection if the output schema changed.
-        if optimized_plan.schema() != &original_schema {
-            optimized_plan = build_recover_project_plan(&original_schema, optimized_plan);
+        match optimized_plan {
+            Some(optimized_plan) if optimized_plan.schema() != &original_schema => {
+                // add an additional projection if the output schema changed.
+                Ok(Some(build_recover_project_plan(
+                    &original_schema,
+                    optimized_plan,
+                )))
+            }
+            plan => Ok(plan),
         }
-
-        Ok(Some(optimized_plan))
     }
 
     fn name(&self) -> &str {
diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs b/datafusion/optimizer/src/eliminate_cross_join.rs
index e0bb11430..c19b43d29 100644
--- a/datafusion/optimizer/src/eliminate_cross_join.rs
+++ b/datafusion/optimizer/src/eliminate_cross_join.rs
@@ -82,7 +82,7 @@ impl OptimizerRule for EliminateCrossJoin {
                         )?;
                     }
                     _ => {
-                        return Ok(Some(utils::optimize_children(self, plan, config)?));
+                        return utils::optimize_children(self, plan, config);
                     }
                 }
 
@@ -102,7 +102,7 @@ impl OptimizerRule for EliminateCrossJoin {
                     )?;
                 }
 
-                left = utils::optimize_children(self, &left, config)?;
+                left = utils::optimize_children(self, &left, config)?.unwrap_or(left);
 
                 if plan.schema() != left.schema() {
                     left = LogicalPlan::Projection(Projection::new_from_schema(
@@ -128,7 +128,7 @@ impl OptimizerRule for EliminateCrossJoin {
                 }
             }
 
-            _ => Ok(Some(utils::optimize_children(self, plan, config)?)),
+            _ => utils::optimize_children(self, plan, config),
         }
     }
 
diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs
index 35c5dacfa..99b3f43b1 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -28,6 +28,7 @@ use datafusion_expr::{
 use std::collections::{HashMap, HashSet};
 use std::iter::once;
 use std::sync::Arc;
+use utils::optimize_children;
 
 /// Push Down Filter optimizer rule pushes filter clauses down the plan
 /// # Introduction
@@ -524,15 +525,14 @@ impl OptimizerRule for PushDownFilter {
             LogicalPlan::Join(join) => {
                 let optimized_plan = push_down_join(plan, join, None)?;
                 return match optimized_plan {
-                    Some(optimized_plan) => Ok(Some(utils::optimize_children(
-                        self,
-                        &optimized_plan,
-                        config,
-                    )?)),
-                    None => Ok(Some(utils::optimize_children(self, plan, config)?)),
+                    Some(optimized_plan) => Ok(Some(
+                        optimize_children(self, &optimized_plan, config)?
+                            .unwrap_or(optimized_plan),
+                    )),
+                    None => optimize_children(self, plan, config),
                 };
             }
-            _ => return Ok(Some(utils::optimize_children(self, plan, config)?)),
+            _ => return optimize_children(self, plan, config),
         };
 
         let child_plan = filter.input.as_ref();
@@ -749,7 +749,9 @@ impl OptimizerRule for PushDownFilter {
             _ => plan.clone(),
         };
 
-        Ok(Some(utils::optimize_children(self, &new_plan, config)?))
+        Ok(Some(
+            optimize_children(self, &new_plan, config)?.unwrap_or(new_plan),
+        ))
     }
 }
 
diff --git a/datafusion/optimizer/src/utils.rs b/datafusion/optimizer/src/utils.rs
index ed0f07f55..a5f06bc82 100644
--- a/datafusion/optimizer/src/utils.rs
+++ b/datafusion/optimizer/src/utils.rs
@@ -37,18 +37,26 @@ use std::sync::Arc;
 /// type. Useful for optimizer rules which want to leave the type
 /// of plan unchanged but still apply to the children.
 /// This also handles the case when the `plan` is a [`LogicalPlan::Explain`].
+///
+/// Returning `Ok(None)` indicates that the plan can't be optimized by the `optimizer`.
 pub fn optimize_children(
     optimizer: &impl OptimizerRule,
     plan: &LogicalPlan,
     config: &dyn OptimizerConfig,
-) -> Result<LogicalPlan> {
+) -> Result<Option<LogicalPlan>> {
     let new_exprs = plan.expressions();
     let mut new_inputs = Vec::with_capacity(plan.inputs().len());
+    let mut plan_is_changed = false;
     for input in plan.inputs() {
         let new_input = optimizer.try_optimize(input, config)?;
+        plan_is_changed = plan_is_changed || new_input.is_some();
         new_inputs.push(new_input.unwrap_or_else(|| input.clone()))
     }
-    from_plan(plan, &new_exprs, &new_inputs)
+    if plan_is_changed {
+        Ok(Some(from_plan(plan, &new_exprs, &new_inputs)?))
+    } else {
+        Ok(None)
+    }
 }
 
 /// Splits a conjunctive [`Expr`] such as `A AND B AND C` => `[A, B, C]`