You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/15 19:01:28 UTC
[arrow-datafusion] branch master updated: Add try_optimize method (#4208)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 28ca3eeac Add try_optimize method (#4208)
28ca3eeac is described below
commit 28ca3eeac437994be836d299748d3fb3e0ed8b04
Author: Andy Grove <an...@gmail.com>
AuthorDate: Tue Nov 15 12:01:22 2022 -0700
Add try_optimize method (#4208)
* Add try_optimize method
* lint
* address feedback
---
.../optimizer/src/decorrelate_where_exists.rs | 76 +++++++++++++---------
datafusion/optimizer/src/optimizer.rs | 21 +++++-
datafusion/optimizer/src/test/mod.rs | 8 +++
datafusion/optimizer/src/utils.rs | 11 ++--
4 files changed, 77 insertions(+), 39 deletions(-)
diff --git a/datafusion/optimizer/src/decorrelate_where_exists.rs b/datafusion/optimizer/src/decorrelate_where_exists.rs
index b52c174b5..9cf9138bc 100644
--- a/datafusion/optimizer/src/decorrelate_where_exists.rs
+++ b/datafusion/optimizer/src/decorrelate_where_exists.rs
@@ -20,9 +20,11 @@ use crate::utils::{
verify_not_disjunction,
};
use crate::{utils, OptimizerConfig, OptimizerRule};
-use datafusion_common::{context, plan_err, DataFusionError};
-use datafusion_expr::logical_plan::{Filter, JoinType, Subquery};
-use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder};
+use datafusion_common::{context, Result};
+use datafusion_expr::{
+ logical_plan::{Filter, JoinType, Subquery},
+ Expr, LogicalPlan, LogicalPlanBuilder,
+};
use std::sync::Arc;
/// Optimizer rule for rewriting subquery filters to joins
@@ -47,7 +49,7 @@ impl DecorrelateWhereExists {
&self,
predicate: &Expr,
optimizer_config: &mut OptimizerConfig,
- ) -> datafusion_common::Result<(Vec<SubqueryInfo>, Vec<Expr>)> {
+ ) -> Result<(Vec<SubqueryInfo>, Vec<Expr>)> {
let filters = split_conjunction(predicate);
let mut subqueries = vec![];
@@ -74,7 +76,17 @@ impl OptimizerRule for DecorrelateWhereExists {
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
- ) -> datafusion_common::Result<LogicalPlan> {
+ ) -> Result<LogicalPlan> {
+ Ok(self
+ .try_optimize(plan, optimizer_config)?
+ .unwrap_or_else(|| plan.clone()))
+ }
+
+ fn try_optimize(
+ &self,
+ plan: &LogicalPlan,
+ optimizer_config: &mut OptimizerConfig,
+ ) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Filter(filter) => {
let predicate = filter.predicate();
@@ -91,19 +103,28 @@ impl OptimizerRule for DecorrelateWhereExists {
)?);
if subqueries.is_empty() {
// regular filter, no subquery exists clause here
- return Ok(optimized_plan);
+ return Ok(Some(optimized_plan));
}
// iterate through all exists clauses in predicate, turning each into a join
let mut cur_input = (**filter_input).clone();
for subquery in subqueries {
- cur_input = optimize_exists(&subquery, &cur_input, &other_exprs)?;
+ if let Some(x) = optimize_exists(&subquery, &cur_input, &other_exprs)?
+ {
+ cur_input = x;
+ } else {
+ return Ok(None);
+ }
}
- Ok(cur_input)
+ Ok(Some(cur_input))
}
_ => {
// Apply the optimization to all inputs of the plan
- utils::optimize_children(self, plan, optimizer_config)
+ Ok(Some(utils::optimize_children(
+ self,
+ plan,
+ optimizer_config,
+ )?))
}
}
}
@@ -132,20 +153,22 @@ fn optimize_exists(
query_info: &SubqueryInfo,
outer_input: &LogicalPlan,
outer_other_exprs: &[Expr],
-) -> datafusion_common::Result<LogicalPlan> {
+) -> Result<Option<LogicalPlan>> {
let subqry_filter = match query_info.query.subquery.as_ref() {
LogicalPlan::Distinct(subqry_distinct) => match subqry_distinct.input.as_ref() {
LogicalPlan::Projection(subqry_proj) => {
Filter::try_from_plan(&subqry_proj.input)
}
- _ => Err(DataFusionError::NotImplemented(
- "Subquery currently only supports distinct or projection".to_string(),
- )),
+ _ => {
+ // Subquery currently only supports distinct or projection
+ return Ok(None);
+ }
},
LogicalPlan::Projection(subqry_proj) => Filter::try_from_plan(&subqry_proj.input),
- _ => Err(DataFusionError::NotImplemented(
- "Subquery currently only supports distinct or projection".to_string(),
- )),
+ _ => {
+ // Subquery currently only supports distinct or projection
+ return Ok(None);
+ }
}
.map_err(|e| context!("cannot optimize non-correlated subquery", e))?;
@@ -159,7 +182,8 @@ fn optimize_exists(
let (outer_cols, subqry_cols, join_filters) =
exprs_to_join_cols(&col_exprs, subqry_filter.input().schema(), false)?;
if subqry_cols.is_empty() || outer_cols.is_empty() {
- plan_err!("cannot optimize non-correlated subquery")?;
+ // cannot optimize non-correlated subquery
+ return Ok(None);
}
// build subquery side of join - the thing the subquery was querying
@@ -188,7 +212,7 @@ fn optimize_exists(
}
let result = new_plan.build()?;
- Ok(result)
+ Ok(Some(result))
}
struct SubqueryInfo {
@@ -318,9 +342,7 @@ mod tests {
.project(vec![col("customer.c_custkey")])?
.build()?;
- let expected = r#"cannot optimize non-correlated subquery"#;
-
- assert_optimizer_err(&DecorrelateWhereExists::new(), &plan, expected);
+ assert_optimization_skipped(&DecorrelateWhereExists::new(), &plan);
Ok(())
}
@@ -339,9 +361,7 @@ mod tests {
.project(vec![col("customer.c_custkey")])?
.build()?;
- let expected = r#"cannot optimize non-correlated subquery"#;
-
- assert_optimizer_err(&DecorrelateWhereExists::new(), &plan, expected);
+ assert_optimization_skipped(&DecorrelateWhereExists::new(), &plan);
Ok(())
}
@@ -360,9 +380,7 @@ mod tests {
.project(vec![col("customer.c_custkey")])?
.build()?;
- let expected = r#"cannot optimize non-correlated subquery"#;
-
- assert_optimizer_err(&DecorrelateWhereExists::new(), &plan, expected);
+ assert_optimization_skipped(&DecorrelateWhereExists::new(), &plan);
Ok(())
}
@@ -426,9 +444,7 @@ mod tests {
.project(vec![col("customer.c_custkey")])?
.build()?;
- let expected = r#"cannot optimize non-correlated subquery"#;
-
- assert_optimizer_err(&DecorrelateWhereExists::new(), &plan, expected);
+ assert_optimization_skipped(&DecorrelateWhereExists::new(), &plan);
Ok(())
}
diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs
index 315f47499..d18e21b9f 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -48,7 +48,18 @@ use std::time::Instant;
/// way. If there are no suitable transformations for the input plan,
/// the optimizer can simply return it as is.
pub trait OptimizerRule {
- /// Rewrite `plan` to an optimized form
+ /// Try and rewrite `plan` to an optimized form, returning None if the plan cannot be
+ /// optimized by this rule.
+ fn try_optimize(
+ &self,
+ plan: &LogicalPlan,
+ optimizer_config: &mut OptimizerConfig,
+ ) -> Result<Option<LogicalPlan>> {
+ self.optimize(plan, optimizer_config).map(Some)
+ }
+
+ /// Rewrite `plan` to an optimized form. This method will eventually be deprecated and
+ /// replace by `try_optimize`.
fn optimize(
&self,
plan: &LogicalPlan,
@@ -209,13 +220,17 @@ impl Optimizer {
log_plan(&format!("Optimizer input (pass {})", i), &new_plan);
for rule in &self.rules {
- let result = rule.optimize(&new_plan, optimizer_config);
+ let result = rule.try_optimize(&new_plan, optimizer_config);
match result {
- Ok(plan) => {
+ Ok(Some(plan)) => {
new_plan = plan;
observer(&new_plan, rule.as_ref());
log_plan(rule.name(), &new_plan);
}
+ Ok(None) => {
+ observer(&new_plan, rule.as_ref());
+ log_plan(rule.name(), &new_plan);
+ }
Err(ref e) => {
if optimizer_config.skip_failing_rules {
// Note to future readers: if you see this warning it signals a
diff --git a/datafusion/optimizer/src/test/mod.rs b/datafusion/optimizer/src/test/mod.rs
index fc7d0bd8a..f5f93517e 100644
--- a/datafusion/optimizer/src/test/mod.rs
+++ b/datafusion/optimizer/src/test/mod.rs
@@ -129,3 +129,11 @@ pub fn assert_optimizer_err(
}
}
}
+
+pub fn assert_optimization_skipped(rule: &dyn OptimizerRule, plan: &LogicalPlan) {
+ let new_plan = rule.optimize(plan, &mut OptimizerConfig::new()).unwrap();
+ assert_eq!(
+ format!("{}", plan.display_indent()),
+ format!("{}", new_plan.display_indent())
+ );
+}
diff --git a/datafusion/optimizer/src/utils.rs b/datafusion/optimizer/src/utils.rs
index c5496b523..e2d326c16 100644
--- a/datafusion/optimizer/src/utils.rs
+++ b/datafusion/optimizer/src/utils.rs
@@ -43,12 +43,11 @@ pub fn optimize_children(
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
let new_exprs = plan.expressions();
- let new_inputs = plan
- .inputs()
- .into_iter()
- .map(|plan| optimizer.optimize(plan, optimizer_config))
- .collect::<Result<Vec<_>>>()?;
-
+ let mut new_inputs = Vec::with_capacity(plan.inputs().len());
+ for input in plan.inputs() {
+ let new_input = optimizer.try_optimize(input, optimizer_config)?;
+ new_inputs.push(new_input.unwrap_or_else(|| input.clone()))
+ }
from_plan(plan, &new_exprs, &new_inputs)
}