You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/07/13 02:49:49 UTC

[arrow-datafusion] branch master updated: Add an ID generator in preparation for PR 2885 (#2887)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a5de4fe0 Add an ID generator in preparation for PR 2885 (#2887)
6a5de4fe0 is described below

commit 6a5de4fe08597896ab6375e3e4b76c5744dcfba7
Author: Brent Gardner <bg...@squarelabs.net>
AuthorDate: Tue Jul 12 20:49:44 2022 -0600

    Add an ID generator in preparation for PR 2885 (#2887)
---
 datafusion/core/src/execution/context.rs               |  5 +++--
 datafusion/core/tests/user_defined_plan.rs             |  2 +-
 datafusion/optimizer/src/common_subexpr_eliminate.rs   |  4 ++--
 datafusion/optimizer/src/eliminate_filter.rs           |  4 ++--
 datafusion/optimizer/src/eliminate_limit.rs            |  4 ++--
 datafusion/optimizer/src/filter_null_join_keys.rs      |  4 ++--
 datafusion/optimizer/src/filter_push_down.rs           |  8 ++++++--
 datafusion/optimizer/src/limit_push_down.rs            |  4 ++--
 datafusion/optimizer/src/optimizer.rs                  | 11 +++++++++--
 datafusion/optimizer/src/projection_push_down.rs       |  4 ++--
 datafusion/optimizer/src/reduce_outer_join.rs          |  4 ++--
 datafusion/optimizer/src/simplify_expressions.rs       |  8 ++++----
 datafusion/optimizer/src/single_distinct_to_groupby.rs |  4 ++--
 datafusion/optimizer/src/subquery_filter_to_join.rs    |  4 ++--
 datafusion/optimizer/src/utils.rs                      |  2 +-
 15 files changed, 42 insertions(+), 30 deletions(-)

diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 9170936af..42de53a88 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -1381,7 +1381,7 @@ impl SessionState {
             // optimize the child plan, capturing the output of each optimizer
             let plan = self.optimizer.optimize(
                 e.plan.as_ref(),
-                &optimizer_config,
+                &mut optimizer_config,
                 |optimized_plan, optimizer| {
                     let optimizer_name = optimizer.name().to_string();
                     let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name };
@@ -1396,7 +1396,8 @@ impl SessionState {
                 schema: e.schema.clone(),
             }))
         } else {
-            self.optimizer.optimize(plan, &optimizer_config, |_, _| {})
+            self.optimizer
+                .optimize(plan, &mut optimizer_config, |_, _| {})
         }
     }
 
diff --git a/datafusion/core/tests/user_defined_plan.rs b/datafusion/core/tests/user_defined_plan.rs
index a62168faf..7e0a7a600 100644
--- a/datafusion/core/tests/user_defined_plan.rs
+++ b/datafusion/core/tests/user_defined_plan.rs
@@ -285,7 +285,7 @@ impl OptimizerRule for TopKOptimizerRule {
     fn optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &OptimizerConfig,
+        optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
         // Note: this code simply looks for the pattern of a Limit followed by a
         // Sort and replaces it by a TopK node. It does not handle many
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index da7e22640..d1e3fea70 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -60,7 +60,7 @@ impl OptimizerRule for CommonSubexprEliminate {
     fn optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &OptimizerConfig,
+        optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
         optimize(plan, optimizer_config)
     }
@@ -708,7 +708,7 @@ mod test {
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
         let optimizer = CommonSubexprEliminate {};
         let optimized_plan = optimizer
-            .optimize(plan, &OptimizerConfig::new())
+            .optimize(plan, &mut OptimizerConfig::new())
             .expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimized_plan);
         assert_eq!(formatted_plan, expected);
diff --git a/datafusion/optimizer/src/eliminate_filter.rs b/datafusion/optimizer/src/eliminate_filter.rs
index 102043188..5e75e8134 100644
--- a/datafusion/optimizer/src/eliminate_filter.rs
+++ b/datafusion/optimizer/src/eliminate_filter.rs
@@ -41,7 +41,7 @@ impl OptimizerRule for EliminateFilter {
     fn optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &OptimizerConfig,
+        optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
         match plan {
             LogicalPlan::Filter(Filter {
@@ -84,7 +84,7 @@ mod tests {
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
         let rule = EliminateFilter::new();
         let optimized_plan = rule
-            .optimize(plan, &OptimizerConfig::new())
+            .optimize(plan, &mut OptimizerConfig::new())
             .expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimized_plan);
         assert_eq!(formatted_plan, expected);
diff --git a/datafusion/optimizer/src/eliminate_limit.rs b/datafusion/optimizer/src/eliminate_limit.rs
index b4fc06215..25c6ac240 100644
--- a/datafusion/optimizer/src/eliminate_limit.rs
+++ b/datafusion/optimizer/src/eliminate_limit.rs
@@ -128,7 +128,7 @@ impl OptimizerRule for EliminateLimit {
     fn optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &OptimizerConfig,
+        optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
         eliminate_limit(self, &Ancestor::NotRelevant, plan, optimizer_config)
     }
@@ -152,7 +152,7 @@ mod tests {
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
         let rule = EliminateLimit::new();
         let optimized_plan = rule
-            .optimize(plan, &OptimizerConfig::new())
+            .optimize(plan, &mut OptimizerConfig::new())
             .expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimized_plan);
         assert_eq!(formatted_plan, expected);
diff --git a/datafusion/optimizer/src/filter_null_join_keys.rs b/datafusion/optimizer/src/filter_null_join_keys.rs
index c17a8088d..7308f5419 100644
--- a/datafusion/optimizer/src/filter_null_join_keys.rs
+++ b/datafusion/optimizer/src/filter_null_join_keys.rs
@@ -38,7 +38,7 @@ impl OptimizerRule for FilterNullJoinKeys {
     fn optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &OptimizerConfig,
+        optimizer_config: &mut OptimizerConfig,
     ) -> datafusion_common::Result<LogicalPlan> {
         match plan {
             LogicalPlan::Join(join) if join.join_type == JoinType::Inner => {
@@ -145,7 +145,7 @@ mod tests {
 
     fn optimize_plan(plan: &LogicalPlan) -> LogicalPlan {
         let rule = FilterNullJoinKeys::default();
-        rule.optimize(plan, &OptimizerConfig::new())
+        rule.optimize(plan, &mut OptimizerConfig::new())
             .expect("failed to optimize plan")
     }
 
diff --git a/datafusion/optimizer/src/filter_push_down.rs b/datafusion/optimizer/src/filter_push_down.rs
index 3de4b5a59..c358ed534 100644
--- a/datafusion/optimizer/src/filter_push_down.rs
+++ b/datafusion/optimizer/src/filter_push_down.rs
@@ -616,7 +616,11 @@ impl OptimizerRule for FilterPushDown {
         "filter_push_down"
     }
 
-    fn optimize(&self, plan: &LogicalPlan, _: &OptimizerConfig) -> Result<LogicalPlan> {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        _: &mut OptimizerConfig,
+    ) -> Result<LogicalPlan> {
         optimize(plan, State::default())
     }
 }
@@ -666,7 +670,7 @@ mod tests {
 
     fn optimize_plan(plan: &LogicalPlan) -> LogicalPlan {
         let rule = FilterPushDown::new();
-        rule.optimize(plan, &OptimizerConfig::new())
+        rule.optimize(plan, &mut OptimizerConfig::new())
             .expect("failed to optimize plan")
     }
 
diff --git a/datafusion/optimizer/src/limit_push_down.rs b/datafusion/optimizer/src/limit_push_down.rs
index 5c754d833..9726be087 100644
--- a/datafusion/optimizer/src/limit_push_down.rs
+++ b/datafusion/optimizer/src/limit_push_down.rs
@@ -335,7 +335,7 @@ impl OptimizerRule for LimitPushDown {
     fn optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &OptimizerConfig,
+        optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
         limit_push_down(self, Ancestor::NotRelevant, plan, optimizer_config)
     }
@@ -358,7 +358,7 @@ mod test {
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
         let rule = LimitPushDown::new();
         let optimized_plan = rule
-            .optimize(plan, &OptimizerConfig::new())
+            .optimize(plan, &mut OptimizerConfig::new())
             .expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimized_plan);
         assert_eq!(formatted_plan, expected);
diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs
index 0cae29814..1fee75df6 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -31,7 +31,7 @@ pub trait OptimizerRule {
     fn optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &OptimizerConfig,
+        optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan>;
 
     /// A human readable name for this optimizer rule
@@ -44,6 +44,7 @@ pub struct OptimizerConfig {
     /// Query execution start time that can be used to rewrite expressions such as `now()`
     /// to use a literal value instead
     pub query_execution_start_time: DateTime<Utc>,
+    next_id: usize,
 }
 
 impl OptimizerConfig {
@@ -51,8 +52,14 @@ impl OptimizerConfig {
     pub fn new() -> Self {
         Self {
             query_execution_start_time: chrono::Utc::now(),
+            next_id: 0, // useful for generating things like unique subquery aliases
         }
     }
+
+    pub fn next_id(&mut self) -> usize {
+        self.next_id += 1;
+        self.next_id
+    }
 }
 
 impl Default for OptimizerConfig {
@@ -80,7 +87,7 @@ impl Optimizer {
     pub fn optimize<F>(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &OptimizerConfig,
+        optimizer_config: &mut OptimizerConfig,
         mut observer: F,
     ) -> Result<LogicalPlan>
     where
diff --git a/datafusion/optimizer/src/projection_push_down.rs b/datafusion/optimizer/src/projection_push_down.rs
index 574236a69..1dfbd817f 100644
--- a/datafusion/optimizer/src/projection_push_down.rs
+++ b/datafusion/optimizer/src/projection_push_down.rs
@@ -48,7 +48,7 @@ impl OptimizerRule for ProjectionPushDown {
     fn optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &OptimizerConfig,
+        optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
         // set of all columns refered by the plan (and thus considered required by the root)
         let required_columns = plan
@@ -1011,6 +1011,6 @@ mod tests {
 
     fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
         let rule = ProjectionPushDown::new();
-        rule.optimize(plan, &OptimizerConfig::new())
+        rule.optimize(plan, &mut OptimizerConfig::new())
     }
 }
diff --git a/datafusion/optimizer/src/reduce_outer_join.rs b/datafusion/optimizer/src/reduce_outer_join.rs
index fa6c95075..0553b6cf6 100644
--- a/datafusion/optimizer/src/reduce_outer_join.rs
+++ b/datafusion/optimizer/src/reduce_outer_join.rs
@@ -41,7 +41,7 @@ impl OptimizerRule for ReduceOuterJoin {
     fn optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &OptimizerConfig,
+        optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
         let mut nonnullable_cols: Vec<Column> = vec![];
 
@@ -367,7 +367,7 @@ mod tests {
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
         let rule = ReduceOuterJoin::new();
         let optimized_plan = rule
-            .optimize(plan, &OptimizerConfig::new())
+            .optimize(plan, &mut OptimizerConfig::new())
             .expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimized_plan);
         assert_eq!(formatted_plan, expected);
diff --git a/datafusion/optimizer/src/simplify_expressions.rs b/datafusion/optimizer/src/simplify_expressions.rs
index 8974639ea..14b881cc0 100644
--- a/datafusion/optimizer/src/simplify_expressions.rs
+++ b/datafusion/optimizer/src/simplify_expressions.rs
@@ -193,7 +193,7 @@ impl OptimizerRule for SimplifyExpressions {
     fn optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &OptimizerConfig,
+        optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
         let mut execution_props = ExecutionProps::new();
         execution_props.query_execution_start_time =
@@ -1545,7 +1545,7 @@ mod tests {
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
         let rule = SimplifyExpressions::new();
         let optimized_plan = rule
-            .optimize(plan, &OptimizerConfig::new())
+            .optimize(plan, &mut OptimizerConfig::new())
             .expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimized_plan);
         assert_eq!(formatted_plan, expected);
@@ -1768,7 +1768,7 @@ mod tests {
         let rule = SimplifyExpressions::new();
 
         let err = rule
-            .optimize(plan, &config)
+            .optimize(plan, &mut config)
             .expect_err("expected optimization to fail");
 
         err.to_string()
@@ -1783,7 +1783,7 @@ mod tests {
         let rule = SimplifyExpressions::new();
 
         let optimized_plan = rule
-            .optimize(plan, &config)
+            .optimize(plan, &mut config)
             .expect("failed to optimize plan");
         return format!("{:?}", optimized_plan);
     }
diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs
index 82adc994f..67ebd4aea 100644
--- a/datafusion/optimizer/src/single_distinct_to_groupby.rs
+++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs
@@ -199,7 +199,7 @@ impl OptimizerRule for SingleDistinctToGroupBy {
     fn optimize(
         &self,
         plan: &LogicalPlan,
-        _optimizer_config: &OptimizerConfig,
+        _optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
         optimize(plan)
     }
@@ -221,7 +221,7 @@ mod tests {
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
         let rule = SingleDistinctToGroupBy::new();
         let optimized_plan = rule
-            .optimize(plan, &OptimizerConfig::new())
+            .optimize(plan, &mut OptimizerConfig::new())
             .expect("failed to optimize plan");
 
         let formatted_plan = format!("{}", optimized_plan.display_indent_schema());
diff --git a/datafusion/optimizer/src/subquery_filter_to_join.rs b/datafusion/optimizer/src/subquery_filter_to_join.rs
index 6a1f1fdd1..b173e425f 100644
--- a/datafusion/optimizer/src/subquery_filter_to_join.rs
+++ b/datafusion/optimizer/src/subquery_filter_to_join.rs
@@ -52,7 +52,7 @@ impl OptimizerRule for SubqueryFilterToJoin {
     fn optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &OptimizerConfig,
+        optimizer_config: &mut OptimizerConfig,
     ) -> Result<LogicalPlan> {
         match plan {
             LogicalPlan::Filter(Filter { predicate, input }) => {
@@ -207,7 +207,7 @@ mod tests {
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
         let rule = SubqueryFilterToJoin::new();
         let optimized_plan = rule
-            .optimize(plan, &OptimizerConfig::new())
+            .optimize(plan, &mut OptimizerConfig::new())
             .expect("failed to optimize plan");
         let formatted_plan = format!("{}", optimized_plan.display_indent_schema());
         assert_eq!(formatted_plan, expected);
diff --git a/datafusion/optimizer/src/utils.rs b/datafusion/optimizer/src/utils.rs
index e0c988e07..cd70c5091 100644
--- a/datafusion/optimizer/src/utils.rs
+++ b/datafusion/optimizer/src/utils.rs
@@ -35,7 +35,7 @@ use std::sync::Arc;
 pub fn optimize_children(
     optimizer: &impl OptimizerRule,
     plan: &LogicalPlan,
-    optimizer_config: &OptimizerConfig,
+    optimizer_config: &mut OptimizerConfig,
 ) -> Result<LogicalPlan> {
     let new_exprs = plan.expressions();
     let new_inputs = plan