You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/15 19:01:28 UTC

[arrow-datafusion] branch master updated: Add try_optimize method (#4208)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 28ca3eeac Add try_optimize method (#4208)
28ca3eeac is described below

commit 28ca3eeac437994be836d299748d3fb3e0ed8b04
Author: Andy Grove <an...@gmail.com>
AuthorDate: Tue Nov 15 12:01:22 2022 -0700

    Add try_optimize method (#4208)
    
    * Add try_optimize method
    
    * lint
    
    * address feedback
---
 .../optimizer/src/decorrelate_where_exists.rs      | 76 +++++++++++++---------
 datafusion/optimizer/src/optimizer.rs              | 21 +++++-
 datafusion/optimizer/src/test/mod.rs               |  8 +++
 datafusion/optimizer/src/utils.rs                  | 11 ++--
 4 files changed, 77 insertions(+), 39 deletions(-)

diff --git a/datafusion/optimizer/src/decorrelate_where_exists.rs b/datafusion/optimizer/src/decorrelate_where_exists.rs
index b52c174b5..9cf9138bc 100644
--- a/datafusion/optimizer/src/decorrelate_where_exists.rs
+++ b/datafusion/optimizer/src/decorrelate_where_exists.rs
@@ -20,9 +20,11 @@ use crate::utils::{
     verify_not_disjunction,
 };
 use crate::{utils, OptimizerConfig, OptimizerRule};
-use datafusion_common::{context, plan_err, DataFusionError};
-use datafusion_expr::logical_plan::{Filter, JoinType, Subquery};
-use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder};
+use datafusion_common::{context, Result};
+use datafusion_expr::{
+    logical_plan::{Filter, JoinType, Subquery},
+    Expr, LogicalPlan, LogicalPlanBuilder,
+};
 use std::sync::Arc;
 
 /// Optimizer rule for rewriting subquery filters to joins
@@ -47,7 +49,7 @@ impl DecorrelateWhereExists {
         &self,
         predicate: &Expr,
         optimizer_config: &mut OptimizerConfig,
-    ) -> datafusion_common::Result<(Vec<SubqueryInfo>, Vec<Expr>)> {
+    ) -> Result<(Vec<SubqueryInfo>, Vec<Expr>)> {
         let filters = split_conjunction(predicate);
 
         let mut subqueries = vec![];
@@ -74,7 +76,17 @@ impl OptimizerRule for DecorrelateWhereExists {
         &self,
         plan: &LogicalPlan,
         optimizer_config: &mut OptimizerConfig,
-    ) -> datafusion_common::Result<LogicalPlan> {
+    ) -> Result<LogicalPlan> {
+        Ok(self
+            .try_optimize(plan, optimizer_config)?
+            .unwrap_or_else(|| plan.clone()))
+    }
+
+    fn try_optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &mut OptimizerConfig,
+    ) -> Result<Option<LogicalPlan>> {
         match plan {
             LogicalPlan::Filter(filter) => {
                 let predicate = filter.predicate();
@@ -91,19 +103,28 @@ impl OptimizerRule for DecorrelateWhereExists {
                 )?);
                 if subqueries.is_empty() {
                     // regular filter, no subquery exists clause here
-                    return Ok(optimized_plan);
+                    return Ok(Some(optimized_plan));
                 }
 
                 // iterate through all exists clauses in predicate, turning each into a join
                 let mut cur_input = (**filter_input).clone();
                 for subquery in subqueries {
-                    cur_input = optimize_exists(&subquery, &cur_input, &other_exprs)?;
+                    if let Some(x) = optimize_exists(&subquery, &cur_input, &other_exprs)?
+                    {
+                        cur_input = x;
+                    } else {
+                        return Ok(None);
+                    }
                 }
-                Ok(cur_input)
+                Ok(Some(cur_input))
             }
             _ => {
                 // Apply the optimization to all inputs of the plan
-                utils::optimize_children(self, plan, optimizer_config)
+                Ok(Some(utils::optimize_children(
+                    self,
+                    plan,
+                    optimizer_config,
+                )?))
             }
         }
     }
@@ -132,20 +153,22 @@ fn optimize_exists(
     query_info: &SubqueryInfo,
     outer_input: &LogicalPlan,
     outer_other_exprs: &[Expr],
-) -> datafusion_common::Result<LogicalPlan> {
+) -> Result<Option<LogicalPlan>> {
     let subqry_filter = match query_info.query.subquery.as_ref() {
         LogicalPlan::Distinct(subqry_distinct) => match subqry_distinct.input.as_ref() {
             LogicalPlan::Projection(subqry_proj) => {
                 Filter::try_from_plan(&subqry_proj.input)
             }
-            _ => Err(DataFusionError::NotImplemented(
-                "Subquery currently only supports distinct or projection".to_string(),
-            )),
+            _ => {
+                // Subquery currently only supports distinct or projection
+                return Ok(None);
+            }
         },
         LogicalPlan::Projection(subqry_proj) => Filter::try_from_plan(&subqry_proj.input),
-        _ => Err(DataFusionError::NotImplemented(
-            "Subquery currently only supports distinct or projection".to_string(),
-        )),
+        _ => {
+            // Subquery currently only supports distinct or projection
+            return Ok(None);
+        }
     }
     .map_err(|e| context!("cannot optimize non-correlated subquery", e))?;
 
@@ -159,7 +182,8 @@ fn optimize_exists(
     let (outer_cols, subqry_cols, join_filters) =
         exprs_to_join_cols(&col_exprs, subqry_filter.input().schema(), false)?;
     if subqry_cols.is_empty() || outer_cols.is_empty() {
-        plan_err!("cannot optimize non-correlated subquery")?;
+        // cannot optimize non-correlated subquery
+        return Ok(None);
     }
 
     // build subquery side of join - the thing the subquery was querying
@@ -188,7 +212,7 @@ fn optimize_exists(
     }
 
     let result = new_plan.build()?;
-    Ok(result)
+    Ok(Some(result))
 }
 
 struct SubqueryInfo {
@@ -318,9 +342,7 @@ mod tests {
             .project(vec![col("customer.c_custkey")])?
             .build()?;
 
-        let expected = r#"cannot optimize non-correlated subquery"#;
-
-        assert_optimizer_err(&DecorrelateWhereExists::new(), &plan, expected);
+        assert_optimization_skipped(&DecorrelateWhereExists::new(), &plan);
         Ok(())
     }
 
@@ -339,9 +361,7 @@ mod tests {
             .project(vec![col("customer.c_custkey")])?
             .build()?;
 
-        let expected = r#"cannot optimize non-correlated subquery"#;
-
-        assert_optimizer_err(&DecorrelateWhereExists::new(), &plan, expected);
+        assert_optimization_skipped(&DecorrelateWhereExists::new(), &plan);
         Ok(())
     }
 
@@ -360,9 +380,7 @@ mod tests {
             .project(vec![col("customer.c_custkey")])?
             .build()?;
 
-        let expected = r#"cannot optimize non-correlated subquery"#;
-
-        assert_optimizer_err(&DecorrelateWhereExists::new(), &plan, expected);
+        assert_optimization_skipped(&DecorrelateWhereExists::new(), &plan);
         Ok(())
     }
 
@@ -426,9 +444,7 @@ mod tests {
             .project(vec![col("customer.c_custkey")])?
             .build()?;
 
-        let expected = r#"cannot optimize non-correlated subquery"#;
-
-        assert_optimizer_err(&DecorrelateWhereExists::new(), &plan, expected);
+        assert_optimization_skipped(&DecorrelateWhereExists::new(), &plan);
         Ok(())
     }
 
diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs
index 315f47499..d18e21b9f 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -48,7 +48,18 @@ use std::time::Instant;
 /// way. If there are no suitable transformations for the input plan,
 /// the optimizer can simply return it as is.
 pub trait OptimizerRule {
-    /// Rewrite `plan` to an optimized form
+    /// Try and rewrite `plan` to an optimized form, returning None if the plan cannot be
+    /// optimized by this rule.
+    fn try_optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &mut OptimizerConfig,
+    ) -> Result<Option<LogicalPlan>> {
+        self.optimize(plan, optimizer_config).map(Some)
+    }
+
+    /// Rewrite `plan` to an optimized form. This method will eventually be deprecated and
+    /// replace by `try_optimize`.
     fn optimize(
         &self,
         plan: &LogicalPlan,
@@ -209,13 +220,17 @@ impl Optimizer {
             log_plan(&format!("Optimizer input (pass {})", i), &new_plan);
 
             for rule in &self.rules {
-                let result = rule.optimize(&new_plan, optimizer_config);
+                let result = rule.try_optimize(&new_plan, optimizer_config);
                 match result {
-                    Ok(plan) => {
+                    Ok(Some(plan)) => {
                         new_plan = plan;
                         observer(&new_plan, rule.as_ref());
                         log_plan(rule.name(), &new_plan);
                     }
+                    Ok(None) => {
+                        observer(&new_plan, rule.as_ref());
+                        log_plan(rule.name(), &new_plan);
+                    }
                     Err(ref e) => {
                         if optimizer_config.skip_failing_rules {
                             // Note to future readers: if you see this warning it signals a
diff --git a/datafusion/optimizer/src/test/mod.rs b/datafusion/optimizer/src/test/mod.rs
index fc7d0bd8a..f5f93517e 100644
--- a/datafusion/optimizer/src/test/mod.rs
+++ b/datafusion/optimizer/src/test/mod.rs
@@ -129,3 +129,11 @@ pub fn assert_optimizer_err(
         }
     }
 }
+
+pub fn assert_optimization_skipped(rule: &dyn OptimizerRule, plan: &LogicalPlan) {
+    let new_plan = rule.optimize(plan, &mut OptimizerConfig::new()).unwrap();
+    assert_eq!(
+        format!("{}", plan.display_indent()),
+        format!("{}", new_plan.display_indent())
+    );
+}
diff --git a/datafusion/optimizer/src/utils.rs b/datafusion/optimizer/src/utils.rs
index c5496b523..e2d326c16 100644
--- a/datafusion/optimizer/src/utils.rs
+++ b/datafusion/optimizer/src/utils.rs
@@ -43,12 +43,11 @@ pub fn optimize_children(
     optimizer_config: &mut OptimizerConfig,
 ) -> Result<LogicalPlan> {
     let new_exprs = plan.expressions();
-    let new_inputs = plan
-        .inputs()
-        .into_iter()
-        .map(|plan| optimizer.optimize(plan, optimizer_config))
-        .collect::<Result<Vec<_>>>()?;
-
+    let mut new_inputs = Vec::with_capacity(plan.inputs().len());
+    for input in plan.inputs() {
+        let new_input = optimizer.try_optimize(input, optimizer_config)?;
+        new_inputs.push(new_input.unwrap_or_else(|| input.clone()))
+    }
     from_plan(plan, &new_exprs, &new_inputs)
 }