You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/06/01 12:17:45 UTC
[arrow-datafusion] branch master updated: Remove `ExecutionProps` dependency from `OptimizerRule` (#2666)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 91352518d Remove `ExecutionProps` dependency from `OptimizerRule` (#2666)
91352518d is described below
commit 91352518dd05af9491f3fecaad5c9077072626cf
Author: Andy Grove <ag...@apache.org>
AuthorDate: Wed Jun 1 06:17:39 2022 -0600
Remove `ExecutionProps` dependency from `OptimizerRule` (#2666)
---
datafusion/core/src/execution/context.rs | 10 ++--
.../core/src/optimizer/common_subexpr_eliminate.rs | 29 ++++++-----
datafusion/core/src/optimizer/eliminate_filter.rs | 8 ++--
datafusion/core/src/optimizer/eliminate_limit.rs | 8 ++--
datafusion/core/src/optimizer/filter_push_down.rs | 10 ++--
datafusion/core/src/optimizer/limit_push_down.rs | 36 +++++++-------
datafusion/core/src/optimizer/optimizer.rs | 34 ++++++++++---
.../core/src/optimizer/projection_push_down.rs | 30 ++++++------
.../core/src/optimizer/simplify_expressions.rs | 56 +++++++++++++---------
.../src/optimizer/single_distinct_to_groupby.rs | 6 +--
.../core/src/optimizer/subquery_filter_to_join.rs | 12 ++---
datafusion/core/src/optimizer/utils.rs | 6 +--
datafusion/core/tests/user_defined_plan.rs | 9 ++--
13 files changed, 145 insertions(+), 109 deletions(-)
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index ba3f86c69..29049fcdf 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -68,7 +68,7 @@ use crate::logical_plan::{
use crate::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
use crate::optimizer::filter_push_down::FilterPushDown;
use crate::optimizer::limit_push_down::LimitPushDown;
-use crate::optimizer::optimizer::OptimizerRule;
+use crate::optimizer::optimizer::{OptimizerConfig, OptimizerRule};
use crate::optimizer::projection_push_down::ProjectionPushDown;
use crate::optimizer::simplify_expressions::SimplifyExpressions;
use crate::optimizer::single_distinct_to_groupby::SingleDistinctToGroupBy;
@@ -1379,7 +1379,9 @@ impl SessionState {
/// Optimizes the logical plan by applying optimizer rules.
pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
- let execution_props = &mut self.execution_props.clone();
+ let mut optimizer_config = OptimizerConfig::new();
+ optimizer_config.query_execution_start_time =
+ self.execution_props.query_execution_start_time;
if let LogicalPlan::Explain(e) = plan {
let mut stringified_plans = e.stringified_plans.clone();
@@ -1387,7 +1389,7 @@ impl SessionState {
// optimize the child plan, capturing the output of each optimizer
let plan = self.optimizer.optimize(
e.plan.as_ref(),
- execution_props,
+ &optimizer_config,
|optimized_plan, optimizer| {
let optimizer_name = optimizer.name().to_string();
let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name };
@@ -1402,7 +1404,7 @@ impl SessionState {
schema: e.schema.clone(),
}))
} else {
- self.optimizer.optimize(plan, execution_props, |_, _| {})
+ self.optimizer.optimize(plan, &optimizer_config, |_, _| {})
}
}
diff --git a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
index 81183f56d..fa99db2e9 100644
--- a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
@@ -18,7 +18,6 @@
//! Eliminate common sub-expression.
use crate::error::Result;
-use crate::execution::context::ExecutionProps;
use crate::logical_plan::plan::{Filter, Projection, Window};
use crate::logical_plan::{
col,
@@ -26,6 +25,7 @@ use crate::logical_plan::{
DFField, DFSchema, Expr, ExprRewritable, ExprRewriter, ExprSchemable, ExprVisitable,
ExpressionVisitor, LogicalPlan, Recursion, RewriteRecursion,
};
+use crate::optimizer::optimizer::OptimizerConfig;
use crate::optimizer::optimizer::OptimizerRule;
use arrow::datatypes::DataType;
use datafusion_expr::expr::GroupingSet;
@@ -60,9 +60,9 @@ impl OptimizerRule for CommonSubexprEliminate {
fn optimize(
&self,
plan: &LogicalPlan,
- execution_props: &ExecutionProps,
+ optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan> {
- optimize(plan, execution_props)
+ optimize(plan, optimizer_config)
}
fn name(&self) -> &str {
@@ -83,7 +83,10 @@ impl CommonSubexprEliminate {
}
}
-fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<LogicalPlan> {
+fn optimize(
+ plan: &LogicalPlan,
+ optimizer_config: &OptimizerConfig,
+) -> Result<LogicalPlan> {
let mut expr_set = ExprSet::new();
match plan {
@@ -101,7 +104,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
input,
&mut expr_set,
schema,
- execution_props,
+ optimizer_config,
)?;
Ok(LogicalPlan::Projection(Projection {
@@ -135,7 +138,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
input,
&mut expr_set,
input.schema(),
- execution_props,
+ optimizer_config,
)?;
Ok(LogicalPlan::Filter(Filter {
@@ -156,7 +159,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
input,
&mut expr_set,
schema,
- execution_props,
+ optimizer_config,
)?;
Ok(LogicalPlan::Window(Window {
@@ -180,7 +183,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
input,
&mut expr_set,
schema,
- execution_props,
+ optimizer_config,
)?;
// note the reversed pop order.
let new_aggr_expr = new_expr.pop().unwrap();
@@ -202,7 +205,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
input,
&mut expr_set,
input.schema(),
- execution_props,
+ optimizer_config,
)?;
Ok(LogicalPlan::Sort(Sort {
@@ -235,7 +238,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
- .map(|input_plan| optimize(input_plan, execution_props))
+ .map(|input_plan| optimize(input_plan, optimizer_config))
.collect::<Result<Vec<_>>>()?;
from_plan(plan, &expr, &new_inputs)
@@ -302,7 +305,7 @@ fn rewrite_expr(
input: &LogicalPlan,
expr_set: &mut ExprSet,
schema: &DFSchema,
- execution_props: &ExecutionProps,
+ optimizer_config: &OptimizerConfig,
) -> Result<(Vec<Vec<Expr>>, LogicalPlan)> {
let mut affected_id = HashSet::<Identifier>::new();
@@ -327,7 +330,7 @@ fn rewrite_expr(
})
.collect::<Result<Vec<_>>>()?;
- let mut new_input = optimize(input, execution_props)?;
+ let mut new_input = optimize(input, optimizer_config)?;
if !affected_id.is_empty() {
new_input = build_project_plan(new_input, affected_id, expr_set)?;
}
@@ -702,7 +705,7 @@ mod test {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let optimizer = CommonSubexprEliminate {};
let optimized_plan = optimizer
- .optimize(plan, &ExecutionProps::new())
+ .optimize(plan, &OptimizerConfig::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
diff --git a/datafusion/core/src/optimizer/eliminate_filter.rs b/datafusion/core/src/optimizer/eliminate_filter.rs
index fb99a9798..a3c3e0341 100644
--- a/datafusion/core/src/optimizer/eliminate_filter.rs
+++ b/datafusion/core/src/optimizer/eliminate_filter.rs
@@ -27,7 +27,7 @@ use crate::logical_plan::plan::Filter;
use crate::logical_plan::{EmptyRelation, LogicalPlan};
use crate::optimizer::optimizer::OptimizerRule;
-use crate::execution::context::ExecutionProps;
+use crate::optimizer::optimizer::OptimizerConfig;
/// Optimization rule that elimanate the scalar value (true/false) filter with an [LogicalPlan::EmptyRelation]
#[derive(Default)]
@@ -44,7 +44,7 @@ impl OptimizerRule for EliminateFilter {
fn optimize(
&self,
plan: &LogicalPlan,
- execution_props: &ExecutionProps,
+ optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan> {
match plan {
LogicalPlan::Filter(Filter {
@@ -65,7 +65,7 @@ impl OptimizerRule for EliminateFilter {
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
- .map(|plan| self.optimize(plan, execution_props))
+ .map(|plan| self.optimize(plan, optimizer_config))
.collect::<Result<Vec<_>>>()?;
from_plan(plan, &plan.expressions(), &new_inputs)
@@ -88,7 +88,7 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = EliminateFilter::new();
let optimized_plan = rule
- .optimize(plan, &ExecutionProps::new())
+ .optimize(plan, &OptimizerConfig::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
diff --git a/datafusion/core/src/optimizer/eliminate_limit.rs b/datafusion/core/src/optimizer/eliminate_limit.rs
index a7acf7ca6..b07f6f224 100644
--- a/datafusion/core/src/optimizer/eliminate_limit.rs
+++ b/datafusion/core/src/optimizer/eliminate_limit.rs
@@ -22,7 +22,7 @@ use crate::logical_plan::{EmptyRelation, Limit, LogicalPlan};
use crate::optimizer::optimizer::OptimizerRule;
use datafusion_expr::utils::from_plan;
-use crate::execution::context::ExecutionProps;
+use crate::optimizer::optimizer::OptimizerConfig;
/// Optimization rule that replaces LIMIT 0 with an [LogicalPlan::EmptyRelation]
#[derive(Default)]
@@ -39,7 +39,7 @@ impl OptimizerRule for EliminateLimit {
fn optimize(
&self,
plan: &LogicalPlan,
- execution_props: &ExecutionProps,
+ optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan> {
match plan {
LogicalPlan::Limit(Limit { n, input }) if *n == 0 => {
@@ -56,7 +56,7 @@ impl OptimizerRule for EliminateLimit {
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
- .map(|plan| self.optimize(plan, execution_props))
+ .map(|plan| self.optimize(plan, optimizer_config))
.collect::<Result<Vec<_>>>()?;
from_plan(plan, &expr, &new_inputs)
@@ -79,7 +79,7 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = EliminateLimit::new();
let optimized_plan = rule
- .optimize(plan, &ExecutionProps::new())
+ .optimize(plan, &OptimizerConfig::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
diff --git a/datafusion/core/src/optimizer/filter_push_down.rs b/datafusion/core/src/optimizer/filter_push_down.rs
index abfb15dd1..a07155c2f 100644
--- a/datafusion/core/src/optimizer/filter_push_down.rs
+++ b/datafusion/core/src/optimizer/filter_push_down.rs
@@ -14,10 +14,8 @@
//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan
-use crate::{
- execution::context::ExecutionProps,
- optimizer::{optimizer::OptimizerRule, utils},
-};
+use crate::optimizer::optimizer::OptimizerConfig;
+use crate::optimizer::{optimizer::OptimizerRule, utils};
use datafusion_common::{Column, DFSchema, Result};
use datafusion_expr::{
col,
@@ -530,7 +528,7 @@ impl OptimizerRule for FilterPushDown {
"filter_push_down"
}
- fn optimize(&self, plan: &LogicalPlan, _: &ExecutionProps) -> Result<LogicalPlan> {
+ fn optimize(&self, plan: &LogicalPlan, _: &OptimizerConfig) -> Result<LogicalPlan> {
optimize(plan, State::default())
}
}
@@ -578,7 +576,7 @@ mod tests {
fn optimize_plan(plan: &LogicalPlan) -> LogicalPlan {
let rule = FilterPushDown::new();
- rule.optimize(plan, &ExecutionProps::new())
+ rule.optimize(plan, &OptimizerConfig::new())
.expect("failed to optimize plan")
}
diff --git a/datafusion/core/src/optimizer/limit_push_down.rs b/datafusion/core/src/optimizer/limit_push_down.rs
index 725e00a00..19578b2bc 100644
--- a/datafusion/core/src/optimizer/limit_push_down.rs
+++ b/datafusion/core/src/optimizer/limit_push_down.rs
@@ -18,10 +18,10 @@
//! Optimizer rule to push down LIMIT in the query plan
//! It will push down through projection, limits (taking the smaller limit)
use crate::error::Result;
-use crate::execution::context::ExecutionProps;
use crate::logical_plan::plan::Projection;
use crate::logical_plan::{Limit, TableScan};
use crate::logical_plan::{LogicalPlan, Union};
+use crate::optimizer::optimizer::OptimizerConfig;
use crate::optimizer::optimizer::OptimizerRule;
use datafusion_common::DataFusionError;
use datafusion_expr::logical_plan::{Join, JoinType, Offset};
@@ -44,7 +44,7 @@ fn limit_push_down(
_optimizer: &LimitPushDown,
upper_limit: Option<usize>,
plan: &LogicalPlan,
- _execution_props: &ExecutionProps,
+ _optimizer_config: &OptimizerConfig,
is_offset: bool,
) -> Result<LogicalPlan> {
match (plan, upper_limit) {
@@ -61,7 +61,7 @@ fn limit_push_down(
_optimizer,
Some(new_limit),
input.as_ref(),
- _execution_props,
+ _optimizer_config,
false,
)?),
}))
@@ -102,7 +102,7 @@ fn limit_push_down(
_optimizer,
upper_limit,
input.as_ref(),
- _execution_props,
+ _optimizer_config,
false,
)?),
schema: schema.clone(),
@@ -127,7 +127,7 @@ fn limit_push_down(
_optimizer,
Some(upper_limit),
x,
- _execution_props,
+ _optimizer_config,
false,
)?),
}))
@@ -153,7 +153,7 @@ fn limit_push_down(
_optimizer,
Some(new_limit),
input.as_ref(),
- _execution_props,
+ _optimizer_config,
true,
)?),
}))
@@ -163,7 +163,7 @@ fn limit_push_down(
//if LeftOuter join push limit to left
generate_push_down_join(
_optimizer,
- _execution_props,
+ _optimizer_config,
plan,
upper_limit,
None,
@@ -174,23 +174,23 @@ fn limit_push_down(
{
generate_push_down_join(
_optimizer,
- _execution_props,
+ _optimizer_config,
plan,
None,
upper_limit,
)
}
- _ => generate_push_down_join(_optimizer, _execution_props, plan, None, None),
+ _ => generate_push_down_join(_optimizer, _optimizer_config, plan, None, None),
},
// For other nodes we can't push down the limit
// But try to recurse and find other limit nodes to push down
- _ => push_down_children_limit(_optimizer, _execution_props, plan),
+ _ => push_down_children_limit(_optimizer, _optimizer_config, plan),
}
}
fn generate_push_down_join(
_optimizer: &LimitPushDown,
- _execution_props: &ExecutionProps,
+ _optimizer_config: &OptimizerConfig,
join: &LogicalPlan,
left_limit: Option<usize>,
right_limit: Option<usize>,
@@ -211,14 +211,14 @@ fn generate_push_down_join(
_optimizer,
left_limit,
left.as_ref(),
- _execution_props,
+ _optimizer_config,
true,
)?),
right: Arc::new(limit_push_down(
_optimizer,
right_limit,
right.as_ref(),
- _execution_props,
+ _optimizer_config,
true,
)?),
on: on.clone(),
@@ -238,7 +238,7 @@ fn generate_push_down_join(
fn push_down_children_limit(
_optimizer: &LimitPushDown,
- _execution_props: &ExecutionProps,
+ _optimizer_config: &OptimizerConfig,
plan: &LogicalPlan,
) -> Result<LogicalPlan> {
let expr = plan.expressions();
@@ -247,7 +247,7 @@ fn push_down_children_limit(
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
- .map(|plan| limit_push_down(_optimizer, None, plan, _execution_props, false))
+ .map(|plan| limit_push_down(_optimizer, None, plan, _optimizer_config, false))
.collect::<Result<Vec<_>>>()?;
from_plan(plan, &expr, &new_inputs)
@@ -257,9 +257,9 @@ impl OptimizerRule for LimitPushDown {
fn optimize(
&self,
plan: &LogicalPlan,
- execution_props: &ExecutionProps,
+ optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan> {
- limit_push_down(self, None, plan, execution_props, false)
+ limit_push_down(self, None, plan, optimizer_config, false)
}
fn name(&self) -> &str {
@@ -280,7 +280,7 @@ mod test {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = LimitPushDown::new();
let optimized_plan = rule
- .optimize(plan, &ExecutionProps::new())
+ .optimize(plan, &OptimizerConfig::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
diff --git a/datafusion/core/src/optimizer/optimizer.rs b/datafusion/core/src/optimizer/optimizer.rs
index cc00eb8f8..8b602f423 100644
--- a/datafusion/core/src/optimizer/optimizer.rs
+++ b/datafusion/core/src/optimizer/optimizer.rs
@@ -17,12 +17,12 @@
//! Query optimizer traits
+use chrono::{DateTime, Utc};
use std::sync::Arc;
use log::{debug, trace};
use crate::error::Result;
-use crate::execution::context::ExecutionProps;
use crate::logical_plan::LogicalPlan;
/// `OptimizerRule` transforms one ['LogicalPlan'] into another which
@@ -33,13 +33,37 @@ pub trait OptimizerRule {
fn optimize(
&self,
plan: &LogicalPlan,
- execution_props: &ExecutionProps,
+ optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan>;
/// A human readable name for this optimizer rule
fn name(&self) -> &str;
}
+/// Placeholder for optimizer configuration options
+#[derive(Debug)]
+pub struct OptimizerConfig {
+ /// Query execution start time that can be used to rewrite expressions such as `now()`
+ /// to use a literal value instead
+ pub query_execution_start_time: DateTime<Utc>,
+}
+
+impl OptimizerConfig {
+ /// Create optimizer config
+ pub fn new() -> Self {
+ Self {
+ query_execution_start_time: chrono::Utc::now(),
+ }
+ }
+}
+
+impl Default for OptimizerConfig {
+ /// Create optimizer config
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
/// A rule-based optimizer.
#[derive(Clone)]
pub struct Optimizer {
@@ -58,19 +82,17 @@ impl Optimizer {
pub fn optimize<F>(
&self,
plan: &LogicalPlan,
- execution_props: &mut ExecutionProps,
+ optimizer_config: &OptimizerConfig,
mut observer: F,
) -> Result<LogicalPlan>
where
F: FnMut(&LogicalPlan, &dyn OptimizerRule),
{
- let execution_props = execution_props.start_execution();
-
let mut new_plan = plan.clone();
debug!("Input logical plan:\n{}\n", plan.display_indent());
trace!("Full input logical plan:\n{:?}", plan);
for rule in &self.rules {
- new_plan = rule.optimize(&new_plan, execution_props)?;
+ new_plan = rule.optimize(&new_plan, optimizer_config)?;
observer(&new_plan, rule.as_ref());
}
debug!("Optimized logical plan:\n{}\n", new_plan.display_indent());
diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs
index 4feef4f99..977a34b53 100644
--- a/datafusion/core/src/optimizer/projection_push_down.rs
+++ b/datafusion/core/src/optimizer/projection_push_down.rs
@@ -19,7 +19,6 @@
//! loaded into memory
use crate::error::{DataFusionError, Result};
-use crate::execution::context::ExecutionProps;
use crate::logical_plan::plan::{
Aggregate, Analyze, Join, Projection, SubqueryAlias, TableScan, Window,
};
@@ -27,6 +26,7 @@ use crate::logical_plan::{
build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan,
LogicalPlanBuilder, ToDFSchema, Union,
};
+use crate::optimizer::optimizer::OptimizerConfig;
use crate::optimizer::optimizer::OptimizerRule;
use arrow::datatypes::{Field, Schema};
use arrow::error::Result as ArrowResult;
@@ -48,7 +48,7 @@ impl OptimizerRule for ProjectionPushDown {
fn optimize(
&self,
plan: &LogicalPlan,
- execution_props: &ExecutionProps,
+ optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan> {
// set of all columns refered by the plan (and thus considered required by the root)
let required_columns = plan
@@ -57,7 +57,7 @@ impl OptimizerRule for ProjectionPushDown {
.iter()
.map(|f| f.qualified_column())
.collect::<HashSet<Column>>();
- optimize_plan(self, plan, &required_columns, false, execution_props)
+ optimize_plan(self, plan, &required_columns, false, optimizer_config)
}
fn name(&self) -> &str {
@@ -132,7 +132,7 @@ fn optimize_plan(
plan: &LogicalPlan,
required_columns: &HashSet<Column>, // set of columns required up to this step
has_projection: bool,
- _execution_props: &ExecutionProps,
+ _optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan> {
let mut new_required_columns = required_columns.clone();
match plan {
@@ -171,7 +171,7 @@ fn optimize_plan(
input,
&new_required_columns,
true,
- _execution_props,
+ _optimizer_config,
)?;
let new_required_columns_optimized = new_input
@@ -226,7 +226,7 @@ fn optimize_plan(
left,
&new_required_columns,
true,
- _execution_props,
+ _optimizer_config,
)?);
let optimized_right = Arc::new(optimize_plan(
@@ -234,7 +234,7 @@ fn optimize_plan(
right,
&new_required_columns,
true,
- _execution_props,
+ _optimizer_config,
)?);
let schema = build_join_schema(
@@ -284,7 +284,7 @@ fn optimize_plan(
input,
required_columns,
true,
- _execution_props,
+ _optimizer_config,
)?)
.build();
};
@@ -300,7 +300,7 @@ fn optimize_plan(
input,
&new_required_columns,
true,
- _execution_props,
+ _optimizer_config,
)?)
.window(new_window_expr)?
.build()
@@ -352,7 +352,7 @@ fn optimize_plan(
input,
&new_required_columns,
true,
- _execution_props,
+ _optimizer_config,
)?),
schema: DFSchemaRef::new(new_schema),
}))
@@ -401,7 +401,7 @@ fn optimize_plan(
&a.input,
&required_columns,
false,
- _execution_props,
+ _optimizer_config,
)?),
verbose: a.verbose,
schema: a.schema.clone(),
@@ -437,7 +437,7 @@ fn optimize_plan(
input_plan,
&new_required_columns,
has_projection,
- _execution_props,
+ _optimizer_config,
)
})
.collect::<Result<Vec<_>>>()?;
@@ -474,7 +474,7 @@ fn optimize_plan(
input,
&new_required_columns,
has_projection,
- _execution_props,
+ _optimizer_config,
)?];
let expr = vec![];
from_plan(plan, &expr, &new_inputs)
@@ -516,7 +516,7 @@ fn optimize_plan(
input_plan,
&new_required_columns,
has_projection,
- _execution_props,
+ _optimizer_config,
)
})
.collect::<Result<Vec<_>>>()?;
@@ -1005,6 +1005,6 @@ mod tests {
fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
let rule = ProjectionPushDown::new();
- rule.optimize(plan, &ExecutionProps::new())
+ rule.optimize(plan, &OptimizerConfig::new())
}
}
diff --git a/datafusion/core/src/optimizer/simplify_expressions.rs b/datafusion/core/src/optimizer/simplify_expressions.rs
index 8c628906a..49a5a1295 100644
--- a/datafusion/core/src/optimizer/simplify_expressions.rs
+++ b/datafusion/core/src/optimizer/simplify_expressions.rs
@@ -24,6 +24,7 @@ use crate::logical_plan::{
lit, DFSchema, DFSchemaRef, Expr, ExprRewritable, ExprRewriter, ExprSimplifiable,
LogicalPlan, RewriteRecursion, SimplifyInfo,
};
+use crate::optimizer::optimizer::OptimizerConfig;
use crate::optimizer::optimizer::OptimizerRule;
use crate::physical_plan::planner::create_physical_expr;
use crate::scalar::ScalarValue;
@@ -192,6 +193,19 @@ impl OptimizerRule for SimplifyExpressions {
}
fn optimize(
+ &self,
+ plan: &LogicalPlan,
+ optimizer_config: &OptimizerConfig,
+ ) -> Result<LogicalPlan> {
+ let mut execution_props = ExecutionProps::new();
+ execution_props.query_execution_start_time =
+ optimizer_config.query_execution_start_time;
+ self.optimize_internal(plan, &execution_props)
+ }
+}
+
+impl SimplifyExpressions {
+ fn optimize_internal(
&self,
plan: &LogicalPlan,
execution_props: &ExecutionProps,
@@ -206,7 +220,7 @@ impl OptimizerRule for SimplifyExpressions {
let new_inputs = plan
.inputs()
.iter()
- .map(|input| self.optimize(input, execution_props))
+ .map(|input| self.optimize_internal(input, execution_props))
.collect::<Result<Vec<_>>>()?;
let expr = plan
@@ -257,8 +271,8 @@ impl SimplifyExpressions {
/// # use datafusion::optimizer::simplify_expressions::ConstEvaluator;
/// # use datafusion::execution::context::ExecutionProps;
///
-/// let execution_props = ExecutionProps::new();
-/// let mut const_evaluator = ConstEvaluator::new(&execution_props);
+/// let optimizer_config = ExecutionProps::new();
+/// let mut const_evaluator = ConstEvaluator::new(&optimizer_config);
///
/// // (1 + 2) + a
/// let expr = (lit(1) + lit(2)) + col("a");
@@ -282,7 +296,7 @@ pub struct ConstEvaluator<'a> {
/// descendants) so this Expr can be evaluated
can_evaluate: Vec<bool>,
- execution_props: &'a ExecutionProps,
+ optimizer_config: &'a ExecutionProps,
input_schema: DFSchema,
input_batch: RecordBatch,
}
@@ -328,8 +342,8 @@ impl<'a> ExprRewriter for ConstEvaluator<'a> {
impl<'a> ConstEvaluator<'a> {
/// Create a new `ConstantEvaluator`. Session constants (such as
/// the time for `now()` are taken from the passed
- /// `execution_props`.
- pub fn new(execution_props: &'a ExecutionProps) -> Self {
+ /// `optimizer_config`.
+ pub fn new(optimizer_config: &'a ExecutionProps) -> Self {
let input_schema = DFSchema::empty();
// The dummy column name is unused and doesn't matter as only
@@ -344,7 +358,7 @@ impl<'a> ConstEvaluator<'a> {
Self {
can_evaluate: vec![],
- execution_props,
+ optimizer_config,
input_schema,
input_batch,
}
@@ -410,7 +424,7 @@ impl<'a> ConstEvaluator<'a> {
&expr,
&self.input_schema,
&self.input_batch.schema(),
- self.execution_props,
+ self.optimizer_config,
)?;
let col_val = phys_expr.evaluate(&self.input_batch)?;
match col_val {
@@ -1179,12 +1193,12 @@ mod tests {
expected_expr: Expr,
date_time: &DateTime<Utc>,
) {
- let execution_props = ExecutionProps {
+ let optimizer_config = ExecutionProps {
query_execution_start_time: *date_time,
var_providers: None,
};
- let mut const_evaluator = ConstEvaluator::new(&execution_props);
+ let mut const_evaluator = ConstEvaluator::new(&optimizer_config);
let evaluated_expr = input_expr
.clone()
.rewrite(&mut const_evaluator)
@@ -1207,8 +1221,8 @@ mod tests {
fn simplify(expr: Expr) -> Expr {
let schema = expr_test_schema();
- let execution_props = ExecutionProps::new();
- let info = SimplifyContext::new(vec![&schema], &execution_props);
+ let optimizer_config = ExecutionProps::new();
+ let info = SimplifyContext::new(vec![&schema], &optimizer_config);
expr.simplify(&info).unwrap()
}
@@ -1518,7 +1532,7 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = SimplifyExpressions::new();
let optimized_plan = rule
- .optimize(plan, &ExecutionProps::new())
+ .optimize(plan, &OptimizerConfig::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
@@ -1736,14 +1750,12 @@ mod tests {
// expect optimizing will result in an error, returning the error string
fn get_optimized_plan_err(plan: &LogicalPlan, date_time: &DateTime<Utc>) -> String {
+ let mut config = OptimizerConfig::new();
+ config.query_execution_start_time = *date_time;
let rule = SimplifyExpressions::new();
- let execution_props = ExecutionProps {
- query_execution_start_time: *date_time,
- var_providers: None,
- };
let err = rule
- .optimize(plan, &execution_props)
+ .optimize(plan, &config)
.expect_err("expected optimization to fail");
err.to_string()
@@ -1753,14 +1765,12 @@ mod tests {
plan: &LogicalPlan,
date_time: &DateTime<Utc>,
) -> String {
+ let mut config = OptimizerConfig::new();
+ config.query_execution_start_time = *date_time;
let rule = SimplifyExpressions::new();
- let execution_props = ExecutionProps {
- query_execution_start_time: *date_time,
- var_providers: None,
- };
let optimized_plan = rule
- .optimize(plan, &execution_props)
+ .optimize(plan, &config)
.expect("failed to optimize plan");
return format!("{:?}", optimized_plan);
}
diff --git a/datafusion/core/src/optimizer/single_distinct_to_groupby.rs b/datafusion/core/src/optimizer/single_distinct_to_groupby.rs
index 1748f9af6..65458c4db 100644
--- a/datafusion/core/src/optimizer/single_distinct_to_groupby.rs
+++ b/datafusion/core/src/optimizer/single_distinct_to_groupby.rs
@@ -18,10 +18,10 @@
//! single distinct to group by optimizer rule
use crate::error::Result;
-use crate::execution::context::ExecutionProps;
use crate::logical_plan::plan::{Aggregate, Projection};
use crate::logical_plan::ExprSchemable;
use crate::logical_plan::{col, DFSchema, Expr, LogicalPlan};
+use crate::optimizer::optimizer::OptimizerConfig;
use crate::optimizer::optimizer::OptimizerRule;
use datafusion_expr::utils::{columnize_expr, from_plan};
use hashbrown::HashSet;
@@ -188,7 +188,7 @@ impl OptimizerRule for SingleDistinctToGroupBy {
fn optimize(
&self,
plan: &LogicalPlan,
- _execution_props: &ExecutionProps,
+ _optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan> {
optimize(plan)
}
@@ -207,7 +207,7 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = SingleDistinctToGroupBy::new();
let optimized_plan = rule
- .optimize(plan, &ExecutionProps::new())
+ .optimize(plan, &OptimizerConfig::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{}", optimized_plan.display_indent_schema());
assert_eq!(formatted_plan, expected);
diff --git a/datafusion/core/src/optimizer/subquery_filter_to_join.rs b/datafusion/core/src/optimizer/subquery_filter_to_join.rs
index 645442519..add03c72d 100644
--- a/datafusion/core/src/optimizer/subquery_filter_to_join.rs
+++ b/datafusion/core/src/optimizer/subquery_filter_to_join.rs
@@ -29,11 +29,11 @@
use std::sync::Arc;
use crate::error::{DataFusionError, Result};
-use crate::execution::context::ExecutionProps;
use crate::logical_plan::plan::{Filter, Join};
use crate::logical_plan::{
build_join_schema, Expr, JoinConstraint, JoinType, LogicalPlan,
};
+use crate::optimizer::optimizer::OptimizerConfig;
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
@@ -52,12 +52,12 @@ impl OptimizerRule for SubqueryFilterToJoin {
fn optimize(
&self,
plan: &LogicalPlan,
- execution_props: &ExecutionProps,
+ optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan> {
match plan {
LogicalPlan::Filter(Filter { predicate, input }) => {
// Apply optimizer rule to current input
- let optimized_input = self.optimize(input, execution_props)?;
+ let optimized_input = self.optimize(input, optimizer_config)?;
// Splitting filter expression into components by AND
let mut filters = vec![];
@@ -97,7 +97,7 @@ impl OptimizerRule for SubqueryFilterToJoin {
} => {
let right_input = self.optimize(
&*subquery.subquery,
- execution_props
+ optimizer_config
)?;
let right_schema = right_input.schema();
if right_schema.fields().len() != 1 {
@@ -167,7 +167,7 @@ impl OptimizerRule for SubqueryFilterToJoin {
}
_ => {
// Apply the optimization to all inputs of the plan
- utils::optimize_children(self, plan, execution_props)
+ utils::optimize_children(self, plan, optimizer_config)
}
}
}
@@ -201,7 +201,7 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = SubqueryFilterToJoin::new();
let optimized_plan = rule
- .optimize(plan, &ExecutionProps::new())
+ .optimize(plan, &OptimizerConfig::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{}", optimized_plan.display_indent_schema());
assert_eq!(formatted_plan, expected);
diff --git a/datafusion/core/src/optimizer/utils.rs b/datafusion/core/src/optimizer/utils.rs
index 81d4b76a2..006613328 100644
--- a/datafusion/core/src/optimizer/utils.rs
+++ b/datafusion/core/src/optimizer/utils.rs
@@ -18,7 +18,7 @@
//! Collection of utility functions that are leveraged by the query optimizer rules
use super::optimizer::OptimizerRule;
-use crate::execution::context::ExecutionProps;
+use crate::optimizer::optimizer::OptimizerConfig;
use datafusion_expr::logical_plan::Filter;
use crate::error::{DataFusionError, Result};
@@ -42,13 +42,13 @@ const WINDOW_SORT_MARKER: &str = "__DATAFUSION_WINDOW_SORT__";
pub fn optimize_children(
optimizer: &impl OptimizerRule,
plan: &LogicalPlan,
- execution_props: &ExecutionProps,
+ optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan> {
let new_exprs = plan.expressions();
let new_inputs = plan
.inputs()
.into_iter()
- .map(|plan| optimizer.optimize(plan, execution_props))
+ .map(|plan| optimizer.optimize(plan, optimizer_config))
.collect::<Result<Vec<_>>>()?;
from_plan(plan, &new_exprs, &new_inputs)
diff --git a/datafusion/core/tests/user_defined_plan.rs b/datafusion/core/tests/user_defined_plan.rs
index d062cf3a3..a1bbb2419 100644
--- a/datafusion/core/tests/user_defined_plan.rs
+++ b/datafusion/core/tests/user_defined_plan.rs
@@ -86,10 +86,11 @@ use std::task::{Context, Poll};
use std::{any::Any, collections::BTreeMap, fmt, sync::Arc};
use async_trait::async_trait;
-use datafusion::execution::context::{ExecutionProps, TaskContext};
+use datafusion::execution::context::TaskContext;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::logical_plan::plan::{Extension, Sort};
use datafusion::logical_plan::{DFSchemaRef, Limit};
+use datafusion::optimizer::optimizer::OptimizerConfig;
/// Execute the specified sql and return the resulting record batches
/// pretty printed as a String.
@@ -284,7 +285,7 @@ impl OptimizerRule for TopKOptimizerRule {
fn optimize(
&self,
plan: &LogicalPlan,
- execution_props: &ExecutionProps,
+ optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan> {
// Note: this code simply looks for the pattern of a Limit followed by a
// Sort and replaces it by a TopK node. It does not handle many
@@ -300,7 +301,7 @@ impl OptimizerRule for TopKOptimizerRule {
return Ok(LogicalPlan::Extension(Extension {
node: Arc::new(TopKPlanNode {
k: *n,
- input: self.optimize(input.as_ref(), execution_props)?,
+ input: self.optimize(input.as_ref(), optimizer_config)?,
expr: expr[0].clone(),
}),
}));
@@ -310,7 +311,7 @@ impl OptimizerRule for TopKOptimizerRule {
// If we didn't find the Limit/Sort combination, recurse as
// normal and build the result.
- optimize_children(self, plan, execution_props)
+ optimize_children(self, plan, optimizer_config)
}
fn name(&self) -> &str {