You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/01/14 17:23:31 UTC
[arrow-datafusion] branch master updated: Update `optimize_children` to return `Result
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new d37dccf6d Update `optimize_children` to return `Result<Option<LogicalPlan>>` (#4888)
d37dccf6d is described below
commit d37dccf6d825762ba5d25a7ac1d10a1f0756e0a6
Author: Remzi Yang <59...@users.noreply.github.com>
AuthorDate: Sun Jan 15 01:23:25 2023 +0800
Update `optimize_children` to return `Result<Option<LogicalPlan>>` (#4888)
* refactor optimize_children
Signed-off-by: remzi <13...@gmail.com>
* comment
Signed-off-by: remzi <13...@gmail.com>
* address comment
Signed-off-by: remzi <13...@gmail.com>
Signed-off-by: remzi <13...@gmail.com>
---
datafusion-examples/examples/rewrite_expr.rs | 21 +++++++++---
datafusion/core/tests/user_defined_plan.rs | 2 +-
.../optimizer/src/common_subexpr_eliminate.rs | 37 +++++++++++++---------
datafusion/optimizer/src/eliminate_cross_join.rs | 6 ++--
datafusion/optimizer/src/push_down_filter.rs | 18 ++++++-----
datafusion/optimizer/src/utils.rs | 12 +++++--
6 files changed, 62 insertions(+), 34 deletions(-)
diff --git a/datafusion-examples/examples/rewrite_expr.rs b/datafusion-examples/examples/rewrite_expr.rs
index b63f8b176..3e87dbe91 100644
--- a/datafusion-examples/examples/rewrite_expr.rs
+++ b/datafusion-examples/examples/rewrite_expr.rs
@@ -79,10 +79,9 @@ impl OptimizerRule for MyRule {
config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
// recurse down and optimize children first
- let plan = utils::optimize_children(self, plan, config)?;
-
- match plan {
- LogicalPlan::Filter(filter) => {
+ let optimized_plan = utils::optimize_children(self, plan, config)?;
+ match optimized_plan {
+ Some(LogicalPlan::Filter(filter)) => {
let mut expr_rewriter = MyExprRewriter {};
let predicate = filter.predicate.clone();
let predicate = predicate.rewrite(&mut expr_rewriter)?;
@@ -91,7 +90,19 @@ impl OptimizerRule for MyRule {
filter.input,
)?)))
}
- _ => Ok(Some(plan.clone())),
+ Some(optimized_plan) => Ok(Some(optimized_plan)),
+ None => match plan {
+ LogicalPlan::Filter(filter) => {
+ let mut expr_rewriter = MyExprRewriter {};
+ let predicate = filter.predicate.clone();
+ let predicate = predicate.rewrite(&mut expr_rewriter)?;
+ Ok(Some(LogicalPlan::Filter(Filter::try_new(
+ predicate,
+ filter.input.clone(),
+ )?)))
+ }
+ _ => Ok(None),
+ },
}
}
}
diff --git a/datafusion/core/tests/user_defined_plan.rs b/datafusion/core/tests/user_defined_plan.rs
index dcd182808..235e1d83b 100644
--- a/datafusion/core/tests/user_defined_plan.rs
+++ b/datafusion/core/tests/user_defined_plan.rs
@@ -318,7 +318,7 @@ impl OptimizerRule for TopKOptimizerRule {
// If we didn't find the Limit/Sort combination, recurse as
// normal and build the result.
- Ok(Some(optimize_children(self, plan, config)?))
+ optimize_children(self, plan, config)
}
fn name(&self) -> &str {
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index 0ac5dc69b..ba073c988 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -102,7 +102,7 @@ impl OptimizerRule for CommonSubexprEliminate {
let mut expr_set = ExprSet::new();
let original_schema = plan.schema().clone();
- let mut optimized_plan = match plan {
+ let optimized_plan = match plan {
LogicalPlan::Projection(Projection {
expr,
input,
@@ -115,11 +115,11 @@ impl OptimizerRule for CommonSubexprEliminate {
let (mut new_expr, new_input) =
self.rewrite_expr(&[expr], &[&arrays], input, &mut expr_set, config)?;
- LogicalPlan::Projection(Projection::try_new_with_schema(
+ Some(LogicalPlan::Projection(Projection::try_new_with_schema(
pop_expr(&mut new_expr)?,
Arc::new(new_input),
schema.clone(),
- )?)
+ )?))
}
LogicalPlan::Filter(filter) => {
let input = &filter.input;
@@ -142,7 +142,10 @@ impl OptimizerRule for CommonSubexprEliminate {
)?;
if let Some(predicate) = pop_expr(&mut new_expr)?.pop() {
- LogicalPlan::Filter(Filter::try_new(predicate, Arc::new(new_input))?)
+ Some(LogicalPlan::Filter(Filter::try_new(
+ predicate,
+ Arc::new(new_input),
+ )?))
} else {
return Err(DataFusionError::Internal(
"Failed to pop predicate expr".to_string(),
@@ -165,11 +168,11 @@ impl OptimizerRule for CommonSubexprEliminate {
config,
)?;
- LogicalPlan::Window(Window {
+ Some(LogicalPlan::Window(Window {
input: Arc::new(new_input),
window_expr: pop_expr(&mut new_expr)?,
schema: schema.clone(),
- })
+ }))
}
LogicalPlan::Aggregate(Aggregate {
group_expr,
@@ -194,12 +197,12 @@ impl OptimizerRule for CommonSubexprEliminate {
let new_aggr_expr = pop_expr(&mut new_expr)?;
let new_group_expr = pop_expr(&mut new_expr)?;
- LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
+ Some(LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
Arc::new(new_input),
new_group_expr,
new_aggr_expr,
schema.clone(),
- )?)
+ )?))
}
LogicalPlan::Sort(Sort { expr, input, fetch }) => {
let input_schema = Arc::clone(input.schema());
@@ -208,11 +211,11 @@ impl OptimizerRule for CommonSubexprEliminate {
let (mut new_expr, new_input) =
self.rewrite_expr(&[expr], &[&arrays], input, &mut expr_set, config)?;
- LogicalPlan::Sort(Sort {
+ Some(LogicalPlan::Sort(Sort {
expr: pop_expr(&mut new_expr)?,
input: Arc::new(new_input),
fetch: *fetch,
- })
+ }))
}
LogicalPlan::Join(_)
| LogicalPlan::CrossJoin(_)
@@ -242,12 +245,16 @@ impl OptimizerRule for CommonSubexprEliminate {
}
};
- // add an additional projection if the output schema changed.
- if optimized_plan.schema() != &original_schema {
- optimized_plan = build_recover_project_plan(&original_schema, optimized_plan);
+ match optimized_plan {
+ Some(optimized_plan) if optimized_plan.schema() != &original_schema => {
+ // add an additional projection if the output schema changed.
+ Ok(Some(build_recover_project_plan(
+ &original_schema,
+ optimized_plan,
+ )))
+ }
+ plan => Ok(plan),
}
-
- Ok(Some(optimized_plan))
}
fn name(&self) -> &str {
diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs b/datafusion/optimizer/src/eliminate_cross_join.rs
index e0bb11430..c19b43d29 100644
--- a/datafusion/optimizer/src/eliminate_cross_join.rs
+++ b/datafusion/optimizer/src/eliminate_cross_join.rs
@@ -82,7 +82,7 @@ impl OptimizerRule for EliminateCrossJoin {
)?;
}
_ => {
- return Ok(Some(utils::optimize_children(self, plan, config)?));
+ return utils::optimize_children(self, plan, config);
}
}
@@ -102,7 +102,7 @@ impl OptimizerRule for EliminateCrossJoin {
)?;
}
- left = utils::optimize_children(self, &left, config)?;
+ left = utils::optimize_children(self, &left, config)?.unwrap_or(left);
if plan.schema() != left.schema() {
left = LogicalPlan::Projection(Projection::new_from_schema(
@@ -128,7 +128,7 @@ impl OptimizerRule for EliminateCrossJoin {
}
}
- _ => Ok(Some(utils::optimize_children(self, plan, config)?)),
+ _ => utils::optimize_children(self, plan, config),
}
}
diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs
index 35c5dacfa..99b3f43b1 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -28,6 +28,7 @@ use datafusion_expr::{
use std::collections::{HashMap, HashSet};
use std::iter::once;
use std::sync::Arc;
+use utils::optimize_children;
/// Push Down Filter optimizer rule pushes filter clauses down the plan
/// # Introduction
@@ -524,15 +525,14 @@ impl OptimizerRule for PushDownFilter {
LogicalPlan::Join(join) => {
let optimized_plan = push_down_join(plan, join, None)?;
return match optimized_plan {
- Some(optimized_plan) => Ok(Some(utils::optimize_children(
- self,
- &optimized_plan,
- config,
- )?)),
- None => Ok(Some(utils::optimize_children(self, plan, config)?)),
+ Some(optimized_plan) => Ok(Some(
+ optimize_children(self, &optimized_plan, config)?
+ .unwrap_or(optimized_plan),
+ )),
+ None => optimize_children(self, plan, config),
};
}
- _ => return Ok(Some(utils::optimize_children(self, plan, config)?)),
+ _ => return optimize_children(self, plan, config),
};
let child_plan = filter.input.as_ref();
@@ -749,7 +749,9 @@ impl OptimizerRule for PushDownFilter {
_ => plan.clone(),
};
- Ok(Some(utils::optimize_children(self, &new_plan, config)?))
+ Ok(Some(
+ optimize_children(self, &new_plan, config)?.unwrap_or(new_plan),
+ ))
}
}
diff --git a/datafusion/optimizer/src/utils.rs b/datafusion/optimizer/src/utils.rs
index ed0f07f55..a5f06bc82 100644
--- a/datafusion/optimizer/src/utils.rs
+++ b/datafusion/optimizer/src/utils.rs
@@ -37,18 +37,26 @@ use std::sync::Arc;
/// type. Useful for optimizer rules which want to leave the type
/// of plan unchanged but still apply to the children.
/// This also handles the case when the `plan` is a [`LogicalPlan::Explain`].
+///
+/// Returning `Ok(None)` indicates that the plan can't be optimized by the `optimizer`.
pub fn optimize_children(
optimizer: &impl OptimizerRule,
plan: &LogicalPlan,
config: &dyn OptimizerConfig,
-) -> Result<LogicalPlan> {
+) -> Result<Option<LogicalPlan>> {
let new_exprs = plan.expressions();
let mut new_inputs = Vec::with_capacity(plan.inputs().len());
+ let mut plan_is_changed = false;
for input in plan.inputs() {
let new_input = optimizer.try_optimize(input, config)?;
+ plan_is_changed = plan_is_changed || new_input.is_some();
new_inputs.push(new_input.unwrap_or_else(|| input.clone()))
}
- from_plan(plan, &new_exprs, &new_inputs)
+ if plan_is_changed {
+ Ok(Some(from_plan(plan, &new_exprs, &new_inputs)?))
+ } else {
+ Ok(None)
+ }
}
/// Splits a conjunctive [`Expr`] such as `A AND B AND C` => `[A, B, C]`