You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/06/01 12:17:45 UTC

[arrow-datafusion] branch master updated: Remove `ExecutionProps` dependency from `OptimizerRule` (#2666)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 91352518d Remove `ExecutionProps` dependency from `OptimizerRule` (#2666)
91352518d is described below

commit 91352518dd05af9491f3fecaad5c9077072626cf
Author: Andy Grove <ag...@apache.org>
AuthorDate: Wed Jun 1 06:17:39 2022 -0600

    Remove `ExecutionProps` dependency from `OptimizerRule` (#2666)
---
 datafusion/core/src/execution/context.rs           | 10 ++--
 .../core/src/optimizer/common_subexpr_eliminate.rs | 29 ++++++-----
 datafusion/core/src/optimizer/eliminate_filter.rs  |  8 ++--
 datafusion/core/src/optimizer/eliminate_limit.rs   |  8 ++--
 datafusion/core/src/optimizer/filter_push_down.rs  | 10 ++--
 datafusion/core/src/optimizer/limit_push_down.rs   | 36 +++++++-------
 datafusion/core/src/optimizer/optimizer.rs         | 34 ++++++++++---
 .../core/src/optimizer/projection_push_down.rs     | 30 ++++++------
 .../core/src/optimizer/simplify_expressions.rs     | 56 +++++++++++++---------
 .../src/optimizer/single_distinct_to_groupby.rs    |  6 +--
 .../core/src/optimizer/subquery_filter_to_join.rs  | 12 ++---
 datafusion/core/src/optimizer/utils.rs             |  6 +--
 datafusion/core/tests/user_defined_plan.rs         |  9 ++--
 13 files changed, 145 insertions(+), 109 deletions(-)

diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index ba3f86c69..29049fcdf 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -68,7 +68,7 @@ use crate::logical_plan::{
 use crate::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
 use crate::optimizer::filter_push_down::FilterPushDown;
 use crate::optimizer::limit_push_down::LimitPushDown;
-use crate::optimizer::optimizer::OptimizerRule;
+use crate::optimizer::optimizer::{OptimizerConfig, OptimizerRule};
 use crate::optimizer::projection_push_down::ProjectionPushDown;
 use crate::optimizer::simplify_expressions::SimplifyExpressions;
 use crate::optimizer::single_distinct_to_groupby::SingleDistinctToGroupBy;
@@ -1379,7 +1379,9 @@ impl SessionState {
 
     /// Optimizes the logical plan by applying optimizer rules.
     pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
-        let execution_props = &mut self.execution_props.clone();
+        let mut optimizer_config = OptimizerConfig::new();
+        optimizer_config.query_execution_start_time =
+            self.execution_props.query_execution_start_time;
 
         if let LogicalPlan::Explain(e) = plan {
             let mut stringified_plans = e.stringified_plans.clone();
@@ -1387,7 +1389,7 @@ impl SessionState {
             // optimize the child plan, capturing the output of each optimizer
             let plan = self.optimizer.optimize(
                 e.plan.as_ref(),
-                execution_props,
+                &optimizer_config,
                 |optimized_plan, optimizer| {
                     let optimizer_name = optimizer.name().to_string();
                     let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name };
@@ -1402,7 +1404,7 @@ impl SessionState {
                 schema: e.schema.clone(),
             }))
         } else {
-            self.optimizer.optimize(plan, execution_props, |_, _| {})
+            self.optimizer.optimize(plan, &optimizer_config, |_, _| {})
         }
     }
 
diff --git a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
index 81183f56d..fa99db2e9 100644
--- a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
@@ -18,7 +18,6 @@
 //! Eliminate common sub-expression.
 
 use crate::error::Result;
-use crate::execution::context::ExecutionProps;
 use crate::logical_plan::plan::{Filter, Projection, Window};
 use crate::logical_plan::{
     col,
@@ -26,6 +25,7 @@ use crate::logical_plan::{
     DFField, DFSchema, Expr, ExprRewritable, ExprRewriter, ExprSchemable, ExprVisitable,
     ExpressionVisitor, LogicalPlan, Recursion, RewriteRecursion,
 };
+use crate::optimizer::optimizer::OptimizerConfig;
 use crate::optimizer::optimizer::OptimizerRule;
 use arrow::datatypes::DataType;
 use datafusion_expr::expr::GroupingSet;
@@ -60,9 +60,9 @@ impl OptimizerRule for CommonSubexprEliminate {
     fn optimize(
         &self,
         plan: &LogicalPlan,
-        execution_props: &ExecutionProps,
+        optimizer_config: &OptimizerConfig,
     ) -> Result<LogicalPlan> {
-        optimize(plan, execution_props)
+        optimize(plan, optimizer_config)
     }
 
     fn name(&self) -> &str {
@@ -83,7 +83,10 @@ impl CommonSubexprEliminate {
     }
 }
 
-fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<LogicalPlan> {
+fn optimize(
+    plan: &LogicalPlan,
+    optimizer_config: &OptimizerConfig,
+) -> Result<LogicalPlan> {
     let mut expr_set = ExprSet::new();
 
     match plan {
@@ -101,7 +104,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
                 input,
                 &mut expr_set,
                 schema,
-                execution_props,
+                optimizer_config,
             )?;
 
             Ok(LogicalPlan::Projection(Projection {
@@ -135,7 +138,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
                 input,
                 &mut expr_set,
                 input.schema(),
-                execution_props,
+                optimizer_config,
             )?;
 
             Ok(LogicalPlan::Filter(Filter {
@@ -156,7 +159,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
                 input,
                 &mut expr_set,
                 schema,
-                execution_props,
+                optimizer_config,
             )?;
 
             Ok(LogicalPlan::Window(Window {
@@ -180,7 +183,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
                 input,
                 &mut expr_set,
                 schema,
-                execution_props,
+                optimizer_config,
             )?;
             // note the reversed pop order.
             let new_aggr_expr = new_expr.pop().unwrap();
@@ -202,7 +205,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
                 input,
                 &mut expr_set,
                 input.schema(),
-                execution_props,
+                optimizer_config,
             )?;
 
             Ok(LogicalPlan::Sort(Sort {
@@ -235,7 +238,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
             let inputs = plan.inputs();
             let new_inputs = inputs
                 .iter()
-                .map(|input_plan| optimize(input_plan, execution_props))
+                .map(|input_plan| optimize(input_plan, optimizer_config))
                 .collect::<Result<Vec<_>>>()?;
 
             from_plan(plan, &expr, &new_inputs)
@@ -302,7 +305,7 @@ fn rewrite_expr(
     input: &LogicalPlan,
     expr_set: &mut ExprSet,
     schema: &DFSchema,
-    execution_props: &ExecutionProps,
+    optimizer_config: &OptimizerConfig,
 ) -> Result<(Vec<Vec<Expr>>, LogicalPlan)> {
     let mut affected_id = HashSet::<Identifier>::new();
 
@@ -327,7 +330,7 @@ fn rewrite_expr(
         })
         .collect::<Result<Vec<_>>>()?;
 
-    let mut new_input = optimize(input, execution_props)?;
+    let mut new_input = optimize(input, optimizer_config)?;
     if !affected_id.is_empty() {
         new_input = build_project_plan(new_input, affected_id, expr_set)?;
     }
@@ -702,7 +705,7 @@ mod test {
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
         let optimizer = CommonSubexprEliminate {};
         let optimized_plan = optimizer
-            .optimize(plan, &ExecutionProps::new())
+            .optimize(plan, &OptimizerConfig::new())
             .expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimized_plan);
         assert_eq!(formatted_plan, expected);
diff --git a/datafusion/core/src/optimizer/eliminate_filter.rs b/datafusion/core/src/optimizer/eliminate_filter.rs
index fb99a9798..a3c3e0341 100644
--- a/datafusion/core/src/optimizer/eliminate_filter.rs
+++ b/datafusion/core/src/optimizer/eliminate_filter.rs
@@ -27,7 +27,7 @@ use crate::logical_plan::plan::Filter;
 use crate::logical_plan::{EmptyRelation, LogicalPlan};
 use crate::optimizer::optimizer::OptimizerRule;
 
-use crate::execution::context::ExecutionProps;
+use crate::optimizer::optimizer::OptimizerConfig;
 
 /// Optimization rule that elimanate the scalar value (true/false) filter with an [LogicalPlan::EmptyRelation]
 #[derive(Default)]
@@ -44,7 +44,7 @@ impl OptimizerRule for EliminateFilter {
     fn optimize(
         &self,
         plan: &LogicalPlan,
-        execution_props: &ExecutionProps,
+        optimizer_config: &OptimizerConfig,
     ) -> Result<LogicalPlan> {
         match plan {
             LogicalPlan::Filter(Filter {
@@ -65,7 +65,7 @@ impl OptimizerRule for EliminateFilter {
                 let inputs = plan.inputs();
                 let new_inputs = inputs
                     .iter()
-                    .map(|plan| self.optimize(plan, execution_props))
+                    .map(|plan| self.optimize(plan, optimizer_config))
                     .collect::<Result<Vec<_>>>()?;
 
                 from_plan(plan, &plan.expressions(), &new_inputs)
@@ -88,7 +88,7 @@ mod tests {
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
         let rule = EliminateFilter::new();
         let optimized_plan = rule
-            .optimize(plan, &ExecutionProps::new())
+            .optimize(plan, &OptimizerConfig::new())
             .expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimized_plan);
         assert_eq!(formatted_plan, expected);
diff --git a/datafusion/core/src/optimizer/eliminate_limit.rs b/datafusion/core/src/optimizer/eliminate_limit.rs
index a7acf7ca6..b07f6f224 100644
--- a/datafusion/core/src/optimizer/eliminate_limit.rs
+++ b/datafusion/core/src/optimizer/eliminate_limit.rs
@@ -22,7 +22,7 @@ use crate::logical_plan::{EmptyRelation, Limit, LogicalPlan};
 use crate::optimizer::optimizer::OptimizerRule;
 use datafusion_expr::utils::from_plan;
 
-use crate::execution::context::ExecutionProps;
+use crate::optimizer::optimizer::OptimizerConfig;
 
 /// Optimization rule that replaces LIMIT 0 with an [LogicalPlan::EmptyRelation]
 #[derive(Default)]
@@ -39,7 +39,7 @@ impl OptimizerRule for EliminateLimit {
     fn optimize(
         &self,
         plan: &LogicalPlan,
-        execution_props: &ExecutionProps,
+        optimizer_config: &OptimizerConfig,
     ) -> Result<LogicalPlan> {
         match plan {
             LogicalPlan::Limit(Limit { n, input }) if *n == 0 => {
@@ -56,7 +56,7 @@ impl OptimizerRule for EliminateLimit {
                 let inputs = plan.inputs();
                 let new_inputs = inputs
                     .iter()
-                    .map(|plan| self.optimize(plan, execution_props))
+                    .map(|plan| self.optimize(plan, optimizer_config))
                     .collect::<Result<Vec<_>>>()?;
 
                 from_plan(plan, &expr, &new_inputs)
@@ -79,7 +79,7 @@ mod tests {
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
         let rule = EliminateLimit::new();
         let optimized_plan = rule
-            .optimize(plan, &ExecutionProps::new())
+            .optimize(plan, &OptimizerConfig::new())
             .expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimized_plan);
         assert_eq!(formatted_plan, expected);
diff --git a/datafusion/core/src/optimizer/filter_push_down.rs b/datafusion/core/src/optimizer/filter_push_down.rs
index abfb15dd1..a07155c2f 100644
--- a/datafusion/core/src/optimizer/filter_push_down.rs
+++ b/datafusion/core/src/optimizer/filter_push_down.rs
@@ -14,10 +14,8 @@
 
 //! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan
 
-use crate::{
-    execution::context::ExecutionProps,
-    optimizer::{optimizer::OptimizerRule, utils},
-};
+use crate::optimizer::optimizer::OptimizerConfig;
+use crate::optimizer::{optimizer::OptimizerRule, utils};
 use datafusion_common::{Column, DFSchema, Result};
 use datafusion_expr::{
     col,
@@ -530,7 +528,7 @@ impl OptimizerRule for FilterPushDown {
         "filter_push_down"
     }
 
-    fn optimize(&self, plan: &LogicalPlan, _: &ExecutionProps) -> Result<LogicalPlan> {
+    fn optimize(&self, plan: &LogicalPlan, _: &OptimizerConfig) -> Result<LogicalPlan> {
         optimize(plan, State::default())
     }
 }
@@ -578,7 +576,7 @@ mod tests {
 
     fn optimize_plan(plan: &LogicalPlan) -> LogicalPlan {
         let rule = FilterPushDown::new();
-        rule.optimize(plan, &ExecutionProps::new())
+        rule.optimize(plan, &OptimizerConfig::new())
             .expect("failed to optimize plan")
     }
 
diff --git a/datafusion/core/src/optimizer/limit_push_down.rs b/datafusion/core/src/optimizer/limit_push_down.rs
index 725e00a00..19578b2bc 100644
--- a/datafusion/core/src/optimizer/limit_push_down.rs
+++ b/datafusion/core/src/optimizer/limit_push_down.rs
@@ -18,10 +18,10 @@
 //! Optimizer rule to push down LIMIT in the query plan
 //! It will push down through projection, limits (taking the smaller limit)
 use crate::error::Result;
-use crate::execution::context::ExecutionProps;
 use crate::logical_plan::plan::Projection;
 use crate::logical_plan::{Limit, TableScan};
 use crate::logical_plan::{LogicalPlan, Union};
+use crate::optimizer::optimizer::OptimizerConfig;
 use crate::optimizer::optimizer::OptimizerRule;
 use datafusion_common::DataFusionError;
 use datafusion_expr::logical_plan::{Join, JoinType, Offset};
@@ -44,7 +44,7 @@ fn limit_push_down(
     _optimizer: &LimitPushDown,
     upper_limit: Option<usize>,
     plan: &LogicalPlan,
-    _execution_props: &ExecutionProps,
+    _optimizer_config: &OptimizerConfig,
     is_offset: bool,
 ) -> Result<LogicalPlan> {
     match (plan, upper_limit) {
@@ -61,7 +61,7 @@ fn limit_push_down(
                     _optimizer,
                     Some(new_limit),
                     input.as_ref(),
-                    _execution_props,
+                    _optimizer_config,
                     false,
                 )?),
             }))
@@ -102,7 +102,7 @@ fn limit_push_down(
                     _optimizer,
                     upper_limit,
                     input.as_ref(),
-                    _execution_props,
+                    _optimizer_config,
                     false,
                 )?),
                 schema: schema.clone(),
@@ -127,7 +127,7 @@ fn limit_push_down(
                             _optimizer,
                             Some(upper_limit),
                             x,
-                            _execution_props,
+                            _optimizer_config,
                             false,
                         )?),
                     }))
@@ -153,7 +153,7 @@ fn limit_push_down(
                     _optimizer,
                     Some(new_limit),
                     input.as_ref(),
-                    _execution_props,
+                    _optimizer_config,
                     true,
                 )?),
             }))
@@ -163,7 +163,7 @@ fn limit_push_down(
                 //if LeftOuter join push limit to left
                 generate_push_down_join(
                     _optimizer,
-                    _execution_props,
+                    _optimizer_config,
                     plan,
                     upper_limit,
                     None,
@@ -174,23 +174,23 @@ fn limit_push_down(
             {
                 generate_push_down_join(
                     _optimizer,
-                    _execution_props,
+                    _optimizer_config,
                     plan,
                     None,
                     upper_limit,
                 )
             }
-            _ => generate_push_down_join(_optimizer, _execution_props, plan, None, None),
+            _ => generate_push_down_join(_optimizer, _optimizer_config, plan, None, None),
         },
         // For other nodes we can't push down the limit
         // But try to recurse and find other limit nodes to push down
-        _ => push_down_children_limit(_optimizer, _execution_props, plan),
+        _ => push_down_children_limit(_optimizer, _optimizer_config, plan),
     }
 }
 
 fn generate_push_down_join(
     _optimizer: &LimitPushDown,
-    _execution_props: &ExecutionProps,
+    _optimizer_config: &OptimizerConfig,
     join: &LogicalPlan,
     left_limit: Option<usize>,
     right_limit: Option<usize>,
@@ -211,14 +211,14 @@ fn generate_push_down_join(
                 _optimizer,
                 left_limit,
                 left.as_ref(),
-                _execution_props,
+                _optimizer_config,
                 true,
             )?),
             right: Arc::new(limit_push_down(
                 _optimizer,
                 right_limit,
                 right.as_ref(),
-                _execution_props,
+                _optimizer_config,
                 true,
             )?),
             on: on.clone(),
@@ -238,7 +238,7 @@ fn generate_push_down_join(
 
 fn push_down_children_limit(
     _optimizer: &LimitPushDown,
-    _execution_props: &ExecutionProps,
+    _optimizer_config: &OptimizerConfig,
     plan: &LogicalPlan,
 ) -> Result<LogicalPlan> {
     let expr = plan.expressions();
@@ -247,7 +247,7 @@ fn push_down_children_limit(
     let inputs = plan.inputs();
     let new_inputs = inputs
         .iter()
-        .map(|plan| limit_push_down(_optimizer, None, plan, _execution_props, false))
+        .map(|plan| limit_push_down(_optimizer, None, plan, _optimizer_config, false))
         .collect::<Result<Vec<_>>>()?;
 
     from_plan(plan, &expr, &new_inputs)
@@ -257,9 +257,9 @@ impl OptimizerRule for LimitPushDown {
     fn optimize(
         &self,
         plan: &LogicalPlan,
-        execution_props: &ExecutionProps,
+        optimizer_config: &OptimizerConfig,
     ) -> Result<LogicalPlan> {
-        limit_push_down(self, None, plan, execution_props, false)
+        limit_push_down(self, None, plan, optimizer_config, false)
     }
 
     fn name(&self) -> &str {
@@ -280,7 +280,7 @@ mod test {
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
         let rule = LimitPushDown::new();
         let optimized_plan = rule
-            .optimize(plan, &ExecutionProps::new())
+            .optimize(plan, &OptimizerConfig::new())
             .expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimized_plan);
         assert_eq!(formatted_plan, expected);
diff --git a/datafusion/core/src/optimizer/optimizer.rs b/datafusion/core/src/optimizer/optimizer.rs
index cc00eb8f8..8b602f423 100644
--- a/datafusion/core/src/optimizer/optimizer.rs
+++ b/datafusion/core/src/optimizer/optimizer.rs
@@ -17,12 +17,12 @@
 
 //! Query optimizer traits
 
+use chrono::{DateTime, Utc};
 use std::sync::Arc;
 
 use log::{debug, trace};
 
 use crate::error::Result;
-use crate::execution::context::ExecutionProps;
 use crate::logical_plan::LogicalPlan;
 
 /// `OptimizerRule` transforms one ['LogicalPlan'] into another which
@@ -33,13 +33,37 @@ pub trait OptimizerRule {
     fn optimize(
         &self,
         plan: &LogicalPlan,
-        execution_props: &ExecutionProps,
+        optimizer_config: &OptimizerConfig,
     ) -> Result<LogicalPlan>;
 
     /// A human readable name for this optimizer rule
     fn name(&self) -> &str;
 }
 
+/// Placeholder for optimizer configuration options
+#[derive(Debug)]
+pub struct OptimizerConfig {
+    /// Query execution start time that can be used to rewrite expressions such as `now()`
+    /// to use a literal value instead
+    pub query_execution_start_time: DateTime<Utc>,
+}
+
+impl OptimizerConfig {
+    /// Create optimizer config
+    pub fn new() -> Self {
+        Self {
+            query_execution_start_time: chrono::Utc::now(),
+        }
+    }
+}
+
+impl Default for OptimizerConfig {
+    /// Create optimizer config
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
 /// A rule-based optimizer.
 #[derive(Clone)]
 pub struct Optimizer {
@@ -58,19 +82,17 @@ impl Optimizer {
     pub fn optimize<F>(
         &self,
         plan: &LogicalPlan,
-        execution_props: &mut ExecutionProps,
+        optimizer_config: &OptimizerConfig,
         mut observer: F,
     ) -> Result<LogicalPlan>
     where
         F: FnMut(&LogicalPlan, &dyn OptimizerRule),
     {
-        let execution_props = execution_props.start_execution();
-
         let mut new_plan = plan.clone();
         debug!("Input logical plan:\n{}\n", plan.display_indent());
         trace!("Full input logical plan:\n{:?}", plan);
         for rule in &self.rules {
-            new_plan = rule.optimize(&new_plan, execution_props)?;
+            new_plan = rule.optimize(&new_plan, optimizer_config)?;
             observer(&new_plan, rule.as_ref());
         }
         debug!("Optimized logical plan:\n{}\n", new_plan.display_indent());
diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs
index 4feef4f99..977a34b53 100644
--- a/datafusion/core/src/optimizer/projection_push_down.rs
+++ b/datafusion/core/src/optimizer/projection_push_down.rs
@@ -19,7 +19,6 @@
 //! loaded into memory
 
 use crate::error::{DataFusionError, Result};
-use crate::execution::context::ExecutionProps;
 use crate::logical_plan::plan::{
     Aggregate, Analyze, Join, Projection, SubqueryAlias, TableScan, Window,
 };
@@ -27,6 +26,7 @@ use crate::logical_plan::{
     build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan,
     LogicalPlanBuilder, ToDFSchema, Union,
 };
+use crate::optimizer::optimizer::OptimizerConfig;
 use crate::optimizer::optimizer::OptimizerRule;
 use arrow::datatypes::{Field, Schema};
 use arrow::error::Result as ArrowResult;
@@ -48,7 +48,7 @@ impl OptimizerRule for ProjectionPushDown {
     fn optimize(
         &self,
         plan: &LogicalPlan,
-        execution_props: &ExecutionProps,
+        optimizer_config: &OptimizerConfig,
     ) -> Result<LogicalPlan> {
         // set of all columns refered by the plan (and thus considered required by the root)
         let required_columns = plan
@@ -57,7 +57,7 @@ impl OptimizerRule for ProjectionPushDown {
             .iter()
             .map(|f| f.qualified_column())
             .collect::<HashSet<Column>>();
-        optimize_plan(self, plan, &required_columns, false, execution_props)
+        optimize_plan(self, plan, &required_columns, false, optimizer_config)
     }
 
     fn name(&self) -> &str {
@@ -132,7 +132,7 @@ fn optimize_plan(
     plan: &LogicalPlan,
     required_columns: &HashSet<Column>, // set of columns required up to this step
     has_projection: bool,
-    _execution_props: &ExecutionProps,
+    _optimizer_config: &OptimizerConfig,
 ) -> Result<LogicalPlan> {
     let mut new_required_columns = required_columns.clone();
     match plan {
@@ -171,7 +171,7 @@ fn optimize_plan(
                 input,
                 &new_required_columns,
                 true,
-                _execution_props,
+                _optimizer_config,
             )?;
 
             let new_required_columns_optimized = new_input
@@ -226,7 +226,7 @@ fn optimize_plan(
                 left,
                 &new_required_columns,
                 true,
-                _execution_props,
+                _optimizer_config,
             )?);
 
             let optimized_right = Arc::new(optimize_plan(
@@ -234,7 +234,7 @@ fn optimize_plan(
                 right,
                 &new_required_columns,
                 true,
-                _execution_props,
+                _optimizer_config,
             )?);
 
             let schema = build_join_schema(
@@ -284,7 +284,7 @@ fn optimize_plan(
                     input,
                     required_columns,
                     true,
-                    _execution_props,
+                    _optimizer_config,
                 )?)
                 .build();
             };
@@ -300,7 +300,7 @@ fn optimize_plan(
                 input,
                 &new_required_columns,
                 true,
-                _execution_props,
+                _optimizer_config,
             )?)
             .window(new_window_expr)?
             .build()
@@ -352,7 +352,7 @@ fn optimize_plan(
                     input,
                     &new_required_columns,
                     true,
-                    _execution_props,
+                    _optimizer_config,
                 )?),
                 schema: DFSchemaRef::new(new_schema),
             }))
@@ -401,7 +401,7 @@ fn optimize_plan(
                     &a.input,
                     &required_columns,
                     false,
-                    _execution_props,
+                    _optimizer_config,
                 )?),
                 verbose: a.verbose,
                 schema: a.schema.clone(),
@@ -437,7 +437,7 @@ fn optimize_plan(
                         input_plan,
                         &new_required_columns,
                         has_projection,
-                        _execution_props,
+                        _optimizer_config,
                     )
                 })
                 .collect::<Result<Vec<_>>>()?;
@@ -474,7 +474,7 @@ fn optimize_plan(
                         input,
                         &new_required_columns,
                         has_projection,
-                        _execution_props,
+                        _optimizer_config,
                     )?];
                     let expr = vec![];
                     from_plan(plan, &expr, &new_inputs)
@@ -516,7 +516,7 @@ fn optimize_plan(
                         input_plan,
                         &new_required_columns,
                         has_projection,
-                        _execution_props,
+                        _optimizer_config,
                     )
                 })
                 .collect::<Result<Vec<_>>>()?;
@@ -1005,6 +1005,6 @@ mod tests {
 
     fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
         let rule = ProjectionPushDown::new();
-        rule.optimize(plan, &ExecutionProps::new())
+        rule.optimize(plan, &OptimizerConfig::new())
     }
 }
diff --git a/datafusion/core/src/optimizer/simplify_expressions.rs b/datafusion/core/src/optimizer/simplify_expressions.rs
index 8c628906a..49a5a1295 100644
--- a/datafusion/core/src/optimizer/simplify_expressions.rs
+++ b/datafusion/core/src/optimizer/simplify_expressions.rs
@@ -24,6 +24,7 @@ use crate::logical_plan::{
     lit, DFSchema, DFSchemaRef, Expr, ExprRewritable, ExprRewriter, ExprSimplifiable,
     LogicalPlan, RewriteRecursion, SimplifyInfo,
 };
+use crate::optimizer::optimizer::OptimizerConfig;
 use crate::optimizer::optimizer::OptimizerRule;
 use crate::physical_plan::planner::create_physical_expr;
 use crate::scalar::ScalarValue;
@@ -192,6 +193,19 @@ impl OptimizerRule for SimplifyExpressions {
     }
 
     fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &OptimizerConfig,
+    ) -> Result<LogicalPlan> {
+        let mut execution_props = ExecutionProps::new();
+        execution_props.query_execution_start_time =
+            optimizer_config.query_execution_start_time;
+        self.optimize_internal(plan, &execution_props)
+    }
+}
+
+impl SimplifyExpressions {
+    fn optimize_internal(
         &self,
         plan: &LogicalPlan,
         execution_props: &ExecutionProps,
@@ -206,7 +220,7 @@ impl OptimizerRule for SimplifyExpressions {
         let new_inputs = plan
             .inputs()
             .iter()
-            .map(|input| self.optimize(input, execution_props))
+            .map(|input| self.optimize_internal(input, execution_props))
             .collect::<Result<Vec<_>>>()?;
 
         let expr = plan
@@ -257,8 +271,8 @@ impl SimplifyExpressions {
 /// # use datafusion::optimizer::simplify_expressions::ConstEvaluator;
 /// # use datafusion::execution::context::ExecutionProps;
 ///
-/// let execution_props = ExecutionProps::new();
-/// let mut const_evaluator = ConstEvaluator::new(&execution_props);
+/// let optimizer_config = ExecutionProps::new();
+/// let mut const_evaluator = ConstEvaluator::new(&optimizer_config);
 ///
 /// // (1 + 2) + a
 /// let expr = (lit(1) + lit(2)) + col("a");
@@ -282,7 +296,7 @@ pub struct ConstEvaluator<'a> {
     /// descendants) so this Expr can be evaluated
     can_evaluate: Vec<bool>,
 
-    execution_props: &'a ExecutionProps,
+    optimizer_config: &'a ExecutionProps,
     input_schema: DFSchema,
     input_batch: RecordBatch,
 }
@@ -328,8 +342,8 @@ impl<'a> ExprRewriter for ConstEvaluator<'a> {
 impl<'a> ConstEvaluator<'a> {
     /// Create a new `ConstantEvaluator`. Session constants (such as
     /// the time for `now()` are taken from the passed
-    /// `execution_props`.
-    pub fn new(execution_props: &'a ExecutionProps) -> Self {
+    /// `optimizer_config`.
+    pub fn new(optimizer_config: &'a ExecutionProps) -> Self {
         let input_schema = DFSchema::empty();
 
         // The dummy column name is unused and doesn't matter as only
@@ -344,7 +358,7 @@ impl<'a> ConstEvaluator<'a> {
 
         Self {
             can_evaluate: vec![],
-            execution_props,
+            optimizer_config,
             input_schema,
             input_batch,
         }
@@ -410,7 +424,7 @@ impl<'a> ConstEvaluator<'a> {
             &expr,
             &self.input_schema,
             &self.input_batch.schema(),
-            self.execution_props,
+            self.optimizer_config,
         )?;
         let col_val = phys_expr.evaluate(&self.input_batch)?;
         match col_val {
@@ -1179,12 +1193,12 @@ mod tests {
         expected_expr: Expr,
         date_time: &DateTime<Utc>,
     ) {
-        let execution_props = ExecutionProps {
+        let optimizer_config = ExecutionProps {
             query_execution_start_time: *date_time,
             var_providers: None,
         };
 
-        let mut const_evaluator = ConstEvaluator::new(&execution_props);
+        let mut const_evaluator = ConstEvaluator::new(&optimizer_config);
         let evaluated_expr = input_expr
             .clone()
             .rewrite(&mut const_evaluator)
@@ -1207,8 +1221,8 @@ mod tests {
 
     fn simplify(expr: Expr) -> Expr {
         let schema = expr_test_schema();
-        let execution_props = ExecutionProps::new();
-        let info = SimplifyContext::new(vec![&schema], &execution_props);
+        let optimizer_config = ExecutionProps::new();
+        let info = SimplifyContext::new(vec![&schema], &optimizer_config);
         expr.simplify(&info).unwrap()
     }
 
@@ -1518,7 +1532,7 @@ mod tests {
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
         let rule = SimplifyExpressions::new();
         let optimized_plan = rule
-            .optimize(plan, &ExecutionProps::new())
+            .optimize(plan, &OptimizerConfig::new())
             .expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimized_plan);
         assert_eq!(formatted_plan, expected);
@@ -1736,14 +1750,12 @@ mod tests {
 
     // expect optimizing will result in an error, returning the error string
     fn get_optimized_plan_err(plan: &LogicalPlan, date_time: &DateTime<Utc>) -> String {
+        let mut config = OptimizerConfig::new();
+        config.query_execution_start_time = *date_time;
         let rule = SimplifyExpressions::new();
-        let execution_props = ExecutionProps {
-            query_execution_start_time: *date_time,
-            var_providers: None,
-        };
 
         let err = rule
-            .optimize(plan, &execution_props)
+            .optimize(plan, &config)
             .expect_err("expected optimization to fail");
 
         err.to_string()
@@ -1753,14 +1765,12 @@ mod tests {
         plan: &LogicalPlan,
         date_time: &DateTime<Utc>,
     ) -> String {
+        let mut config = OptimizerConfig::new();
+        config.query_execution_start_time = *date_time;
         let rule = SimplifyExpressions::new();
-        let execution_props = ExecutionProps {
-            query_execution_start_time: *date_time,
-            var_providers: None,
-        };
 
         let optimized_plan = rule
-            .optimize(plan, &execution_props)
+            .optimize(plan, &config)
             .expect("failed to optimize plan");
         return format!("{:?}", optimized_plan);
     }
diff --git a/datafusion/core/src/optimizer/single_distinct_to_groupby.rs b/datafusion/core/src/optimizer/single_distinct_to_groupby.rs
index 1748f9af6..65458c4db 100644
--- a/datafusion/core/src/optimizer/single_distinct_to_groupby.rs
+++ b/datafusion/core/src/optimizer/single_distinct_to_groupby.rs
@@ -18,10 +18,10 @@
 //! single distinct to group by optimizer rule
 
 use crate::error::Result;
-use crate::execution::context::ExecutionProps;
 use crate::logical_plan::plan::{Aggregate, Projection};
 use crate::logical_plan::ExprSchemable;
 use crate::logical_plan::{col, DFSchema, Expr, LogicalPlan};
+use crate::optimizer::optimizer::OptimizerConfig;
 use crate::optimizer::optimizer::OptimizerRule;
 use datafusion_expr::utils::{columnize_expr, from_plan};
 use hashbrown::HashSet;
@@ -188,7 +188,7 @@ impl OptimizerRule for SingleDistinctToGroupBy {
     fn optimize(
         &self,
         plan: &LogicalPlan,
-        _execution_props: &ExecutionProps,
+        _optimizer_config: &OptimizerConfig,
     ) -> Result<LogicalPlan> {
         optimize(plan)
     }
@@ -207,7 +207,7 @@ mod tests {
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
         let rule = SingleDistinctToGroupBy::new();
         let optimized_plan = rule
-            .optimize(plan, &ExecutionProps::new())
+            .optimize(plan, &OptimizerConfig::new())
             .expect("failed to optimize plan");
         let formatted_plan = format!("{}", optimized_plan.display_indent_schema());
         assert_eq!(formatted_plan, expected);
diff --git a/datafusion/core/src/optimizer/subquery_filter_to_join.rs b/datafusion/core/src/optimizer/subquery_filter_to_join.rs
index 645442519..add03c72d 100644
--- a/datafusion/core/src/optimizer/subquery_filter_to_join.rs
+++ b/datafusion/core/src/optimizer/subquery_filter_to_join.rs
@@ -29,11 +29,11 @@
 use std::sync::Arc;
 
 use crate::error::{DataFusionError, Result};
-use crate::execution::context::ExecutionProps;
 use crate::logical_plan::plan::{Filter, Join};
 use crate::logical_plan::{
     build_join_schema, Expr, JoinConstraint, JoinType, LogicalPlan,
 };
+use crate::optimizer::optimizer::OptimizerConfig;
 use crate::optimizer::optimizer::OptimizerRule;
 use crate::optimizer::utils;
 
@@ -52,12 +52,12 @@ impl OptimizerRule for SubqueryFilterToJoin {
     fn optimize(
         &self,
         plan: &LogicalPlan,
-        execution_props: &ExecutionProps,
+        optimizer_config: &OptimizerConfig,
     ) -> Result<LogicalPlan> {
         match plan {
             LogicalPlan::Filter(Filter { predicate, input }) => {
                 // Apply optimizer rule to current input
-                let optimized_input = self.optimize(input, execution_props)?;
+                let optimized_input = self.optimize(input, optimizer_config)?;
 
                 // Splitting filter expression into components by AND
                 let mut filters = vec![];
@@ -97,7 +97,7 @@ impl OptimizerRule for SubqueryFilterToJoin {
                         } => {
                             let right_input = self.optimize(
                                 &*subquery.subquery,
-                                execution_props
+                                optimizer_config
                             )?;
                             let right_schema = right_input.schema();
                             if right_schema.fields().len() != 1 {
@@ -167,7 +167,7 @@ impl OptimizerRule for SubqueryFilterToJoin {
             }
             _ => {
                 // Apply the optimization to all inputs of the plan
-                utils::optimize_children(self, plan, execution_props)
+                utils::optimize_children(self, plan, optimizer_config)
             }
         }
     }
@@ -201,7 +201,7 @@ mod tests {
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
         let rule = SubqueryFilterToJoin::new();
         let optimized_plan = rule
-            .optimize(plan, &ExecutionProps::new())
+            .optimize(plan, &OptimizerConfig::new())
             .expect("failed to optimize plan");
         let formatted_plan = format!("{}", optimized_plan.display_indent_schema());
         assert_eq!(formatted_plan, expected);
diff --git a/datafusion/core/src/optimizer/utils.rs b/datafusion/core/src/optimizer/utils.rs
index 81d4b76a2..006613328 100644
--- a/datafusion/core/src/optimizer/utils.rs
+++ b/datafusion/core/src/optimizer/utils.rs
@@ -18,7 +18,7 @@
 //! Collection of utility functions that are leveraged by the query optimizer rules
 
 use super::optimizer::OptimizerRule;
-use crate::execution::context::ExecutionProps;
+use crate::optimizer::optimizer::OptimizerConfig;
 use datafusion_expr::logical_plan::Filter;
 
 use crate::error::{DataFusionError, Result};
@@ -42,13 +42,13 @@ const WINDOW_SORT_MARKER: &str = "__DATAFUSION_WINDOW_SORT__";
 pub fn optimize_children(
     optimizer: &impl OptimizerRule,
     plan: &LogicalPlan,
-    execution_props: &ExecutionProps,
+    optimizer_config: &OptimizerConfig,
 ) -> Result<LogicalPlan> {
     let new_exprs = plan.expressions();
     let new_inputs = plan
         .inputs()
         .into_iter()
-        .map(|plan| optimizer.optimize(plan, execution_props))
+        .map(|plan| optimizer.optimize(plan, optimizer_config))
         .collect::<Result<Vec<_>>>()?;
 
     from_plan(plan, &new_exprs, &new_inputs)
diff --git a/datafusion/core/tests/user_defined_plan.rs b/datafusion/core/tests/user_defined_plan.rs
index d062cf3a3..a1bbb2419 100644
--- a/datafusion/core/tests/user_defined_plan.rs
+++ b/datafusion/core/tests/user_defined_plan.rs
@@ -86,10 +86,11 @@ use std::task::{Context, Poll};
 use std::{any::Any, collections::BTreeMap, fmt, sync::Arc};
 
 use async_trait::async_trait;
-use datafusion::execution::context::{ExecutionProps, TaskContext};
+use datafusion::execution::context::TaskContext;
 use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
 use datafusion::logical_plan::plan::{Extension, Sort};
 use datafusion::logical_plan::{DFSchemaRef, Limit};
+use datafusion::optimizer::optimizer::OptimizerConfig;
 
 /// Execute the specified sql and return the resulting record batches
 /// pretty printed as a String.
@@ -284,7 +285,7 @@ impl OptimizerRule for TopKOptimizerRule {
     fn optimize(
         &self,
         plan: &LogicalPlan,
-        execution_props: &ExecutionProps,
+        optimizer_config: &OptimizerConfig,
     ) -> Result<LogicalPlan> {
         // Note: this code simply looks for the pattern of a Limit followed by a
         // Sort and replaces it by a TopK node. It does not handle many
@@ -300,7 +301,7 @@ impl OptimizerRule for TopKOptimizerRule {
                     return Ok(LogicalPlan::Extension(Extension {
                         node: Arc::new(TopKPlanNode {
                             k: *n,
-                            input: self.optimize(input.as_ref(), execution_props)?,
+                            input: self.optimize(input.as_ref(), optimizer_config)?,
                             expr: expr[0].clone(),
                         }),
                     }));
@@ -310,7 +311,7 @@ impl OptimizerRule for TopKOptimizerRule {
 
         // If we didn't find the Limit/Sort combination, recurse as
         // normal and build the result.
-        optimize_children(self, plan, execution_props)
+        optimize_children(self, plan, optimizer_config)
     }
 
     fn name(&self) -> &str {