You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/12/28 11:58:56 UTC

[arrow-datafusion] branch master updated: Decouple physical optimizer from SessionConfig (#3887) (#4749)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new d3ca9b05b Decouple physical optimizer from SessionConfig (#3887) (#4749)
d3ca9b05b is described below

commit d3ca9b05b90368fb5a8696a366d727f2b7358b4e
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Wed Dec 28 11:58:50 2022 +0000

    Decouple physical optimizer from SessionConfig (#3887) (#4749)
    
    * Decouple physical optimizer from SessionConfig (#3887)
    
    * Logical conflicts
---
 .../src/physical_optimizer/aggregate_statistics.rs | 13 ++++++------
 .../src/physical_optimizer/coalesce_batches.rs     |  3 ++-
 .../core/src/physical_optimizer/enforcement.rs     | 16 ++++++++-------
 .../physical_optimizer/global_sort_selection.rs    |  4 ++--
 .../core/src/physical_optimizer/join_selection.rs  | 20 ++++++++----------
 .../core/src/physical_optimizer/optimize_sorts.rs  | 24 +++++++++++-----------
 .../core/src/physical_optimizer/optimizer.rs       |  7 +++----
 .../core/src/physical_optimizer/repartition.rs     | 14 ++++++++-----
 datafusion/core/src/physical_optimizer/utils.rs    |  6 +++---
 datafusion/core/src/physical_plan/planner.rs       |  2 +-
 10 files changed, 57 insertions(+), 52 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
index ff4c2190f..3014f5c54 100644
--- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
+++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
@@ -18,9 +18,9 @@
 //! Utilizing exact statistics from sources to avoid scanning data
 use std::sync::Arc;
 
+use crate::config::ConfigOptions;
 use datafusion_expr::utils::COUNT_STAR_EXPANSION;
 
-use crate::execution::context::SessionConfig;
 use crate::physical_plan::aggregates::{AggregateExec, AggregateMode};
 use crate::physical_plan::empty::EmptyExec;
 use crate::physical_plan::projection::ProjectionExec;
@@ -51,7 +51,7 @@ impl PhysicalOptimizerRule for AggregateStatistics {
     fn optimize(
         &self,
         plan: Arc<dyn ExecutionPlan>,
-        config: &SessionConfig,
+        config: &ConfigOptions,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         if let Some(partial_agg_exec) = take_optimizable(&*plan) {
             let partial_agg_exec = partial_agg_exec
@@ -307,9 +307,10 @@ mod tests {
         agg: TestAggregate,
     ) -> Result<()> {
         let session_ctx = SessionContext::new();
-        let conf = session_ctx.copied_config();
+        let state = session_ctx.state();
         let plan = Arc::new(plan) as _;
-        let optimized = AggregateStatistics::new().optimize(Arc::clone(&plan), &conf)?;
+        let optimized = AggregateStatistics::new()
+            .optimize(Arc::clone(&plan), state.config_options())?;
 
         // A ProjectionExec is a sign that the count optimization was applied
         assert!(optimized.as_any().is::<ProjectionExec>());
@@ -548,7 +549,7 @@ mod tests {
             Arc::clone(&schema),
         )?;
 
-        let conf = SessionConfig::new();
+        let conf = ConfigOptions::new();
         let optimized =
             AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?;
 
@@ -591,7 +592,7 @@ mod tests {
             Arc::clone(&schema),
         )?;
 
-        let conf = SessionConfig::new();
+        let conf = ConfigOptions::new();
         let optimized =
             AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?;
 
diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
index e0d20be16..40de861ec 100644
--- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs
+++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
@@ -18,6 +18,7 @@
 //! CoalesceBatches optimizer that groups batches together rows
 //! in bigger batches to avoid overhead with small batches
 
+use crate::config::ConfigOptions;
 use crate::{
     error::Result,
     physical_optimizer::PhysicalOptimizerRule,
@@ -46,7 +47,7 @@ impl PhysicalOptimizerRule for CoalesceBatches {
     fn optimize(
         &self,
         plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
-        _config: &crate::execution::context::SessionConfig,
+        _config: &ConfigOptions,
     ) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
         let target_batch_size = self.target_batch_size;
         plan.transform_up(&|plan| {
diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs b/datafusion/core/src/physical_optimizer/enforcement.rs
index 30a796191..59e7a0190 100644
--- a/datafusion/core/src/physical_optimizer/enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/enforcement.rs
@@ -18,7 +18,9 @@
 //! Enforcement optimizer rules are used to make sure the plan's Distribution and Ordering
 //! requirements are met by inserting necessary [[RepartitionExec]] and [[SortExec]].
 //!
-use crate::config::OPT_TOP_DOWN_JOIN_KEY_REORDERING;
+use crate::config::{
+    ConfigOptions, OPT_TARGET_PARTITIONS, OPT_TOP_DOWN_JOIN_KEY_REORDERING,
+};
 use crate::error::Result;
 use crate::physical_optimizer::utils::{add_sort_above_child, ordering_satisfy};
 use crate::physical_optimizer::PhysicalOptimizerRule;
@@ -34,7 +36,6 @@ use crate::physical_plan::sorts::sort::SortOptions;
 use crate::physical_plan::windows::WindowAggExec;
 use crate::physical_plan::Partitioning;
 use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan};
-use crate::prelude::SessionConfig;
 use arrow::datatypes::SchemaRef;
 use datafusion_expr::logical_plan::JoinType;
 use datafusion_physical_expr::equivalence::EquivalenceProperties;
@@ -69,11 +70,10 @@ impl PhysicalOptimizerRule for BasicEnforcement {
     fn optimize(
         &self,
         plan: Arc<dyn ExecutionPlan>,
-        config: &SessionConfig,
+        config: &ConfigOptions,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        let target_partitions = config.target_partitions();
+        let target_partitions = config.get_usize(OPT_TARGET_PARTITIONS).unwrap();
         let top_down_join_key_reordering = config
-            .config_options()
             .get_bool(OPT_TOP_DOWN_JOIN_KEY_REORDERING)
             .unwrap_or_default();
         let new_plan = if top_down_join_key_reordering {
@@ -1135,10 +1135,12 @@ mod tests {
         ($EXPECTED_LINES: expr, $PLAN: expr) => {
             let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();
 
+            let mut config = ConfigOptions::new();
+            config.set_usize(OPT_TARGET_PARTITIONS, 10);
+
             // run optimizer
             let optimizer = BasicEnforcement {};
-            let optimized = optimizer
-                .optimize($PLAN, &SessionConfig::new().with_target_partitions(10))?;
+            let optimized = optimizer.optimize($PLAN, &config)?;
 
             // Now format correctly
             let plan = displayable(optimized.as_ref()).indent().to_string();
diff --git a/datafusion/core/src/physical_optimizer/global_sort_selection.rs b/datafusion/core/src/physical_optimizer/global_sort_selection.rs
index a6bb8229c..81b4b59e3 100644
--- a/datafusion/core/src/physical_optimizer/global_sort_selection.rs
+++ b/datafusion/core/src/physical_optimizer/global_sort_selection.rs
@@ -19,13 +19,13 @@
 
 use std::sync::Arc;
 
+use crate::config::ConfigOptions;
 use crate::error::Result;
 use crate::physical_optimizer::PhysicalOptimizerRule;
 use crate::physical_plan::rewrite::TreeNodeRewritable;
 use crate::physical_plan::sorts::sort::SortExec;
 use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
 use crate::physical_plan::ExecutionPlan;
-use crate::prelude::SessionConfig;
 
 /// Currently for a sort operator, if
 /// - there are more than one input partitions
@@ -48,7 +48,7 @@ impl PhysicalOptimizerRule for GlobalSortSelection {
     fn optimize(
         &self,
         plan: Arc<dyn ExecutionPlan>,
-        _config: &SessionConfig,
+        _config: &ConfigOptions,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         plan.transform_up(&|plan| {
             Ok(plan
diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs
index 63e7937fe..69e9e0f4d 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -20,8 +20,7 @@ use std::sync::Arc;
 
 use arrow::datatypes::Schema;
 
-use crate::config::OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD;
-use crate::execution::context::SessionConfig;
+use crate::config::{ConfigOptions, OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD};
 use crate::logical_expr::JoinType;
 use crate::physical_plan::expressions::Column;
 use crate::physical_plan::joins::{
@@ -211,10 +210,9 @@ impl PhysicalOptimizerRule for JoinSelection {
     fn optimize(
         &self,
         plan: Arc<dyn ExecutionPlan>,
-        session_config: &SessionConfig,
+        config: &ConfigOptions,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        let collect_left_threshold: usize = session_config
-            .config_options()
+        let collect_left_threshold: usize = config
             .get_u64(OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD)
             .unwrap_or_default()
             .try_into()
@@ -508,7 +506,7 @@ mod tests {
         .unwrap();
 
         let optimized_join = JoinSelection::new()
-            .optimize(Arc::new(join), &SessionConfig::new())
+            .optimize(Arc::new(join), &ConfigOptions::new())
             .unwrap();
 
         let swapping_projection = optimized_join
@@ -556,7 +554,7 @@ mod tests {
         .unwrap();
 
         let optimized_join = JoinSelection::new()
-            .optimize(Arc::new(join), &SessionConfig::new())
+            .optimize(Arc::new(join), &ConfigOptions::new())
             .unwrap();
 
         let swapping_projection = optimized_join
@@ -609,7 +607,7 @@ mod tests {
             let original_schema = join.schema();
 
             let optimized_join = JoinSelection::new()
-                .optimize(Arc::new(join), &SessionConfig::new())
+                .optimize(Arc::new(join), &ConfigOptions::new())
                 .unwrap();
 
             let swapped_join = optimized_join
@@ -638,7 +636,7 @@ mod tests {
                 $EXPECTED_LINES.iter().map(|s| *s).collect::<Vec<&str>>();
 
             let optimized = JoinSelection::new()
-                .optimize(Arc::new($PLAN), &SessionConfig::new())
+                .optimize(Arc::new($PLAN), &ConfigOptions::new())
                 .unwrap();
 
             let plan = displayable(optimized.as_ref()).indent().to_string();
@@ -725,7 +723,7 @@ mod tests {
         .unwrap();
 
         let optimized_join = JoinSelection::new()
-            .optimize(Arc::new(join), &SessionConfig::new())
+            .optimize(Arc::new(join), &ConfigOptions::new())
             .unwrap();
 
         let swapped_join = optimized_join
@@ -950,7 +948,7 @@ mod tests {
         .unwrap();
 
         let optimized_join = JoinSelection::new()
-            .optimize(Arc::new(join), &SessionConfig::new())
+            .optimize(Arc::new(join), &ConfigOptions::new())
             .unwrap();
 
         if !is_swapped {
diff --git a/datafusion/core/src/physical_optimizer/optimize_sorts.rs b/datafusion/core/src/physical_optimizer/optimize_sorts.rs
index cb421b7b8..0a3be1d5b 100644
--- a/datafusion/core/src/physical_optimizer/optimize_sorts.rs
+++ b/datafusion/core/src/physical_optimizer/optimize_sorts.rs
@@ -25,6 +25,7 @@
 //! "  SortExec: [non_nullable_col@1 ASC]",
 //! in the physical plan. The first sort is unnecessary since its result is overwritten
 //! by another SortExec. Therefore, this rule removes it from the physical plan.
+use crate::config::ConfigOptions;
 use crate::error::Result;
 use crate::physical_optimizer::utils::{
     add_sort_above_child, ordering_satisfy, ordering_satisfy_concrete,
@@ -34,7 +35,6 @@ use crate::physical_plan::rewrite::TreeNodeRewritable;
 use crate::physical_plan::sorts::sort::SortExec;
 use crate::physical_plan::windows::WindowAggExec;
 use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
-use crate::prelude::SessionConfig;
 use arrow::datatypes::SchemaRef;
 use datafusion_common::{reverse_sort_options, DataFusionError};
 use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
@@ -122,7 +122,7 @@ impl PhysicalOptimizerRule for OptimizeSorts {
     fn optimize(
         &self,
         plan: Arc<dyn ExecutionPlan>,
-        _config: &SessionConfig,
+        _config: &ConfigOptions,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         // Execute a post-order traversal to adjust input key ordering:
         let plan_requirements = PlanWithCorrespondingSort::new(plan);
@@ -557,7 +557,7 @@ mod tests {
     #[tokio::test]
     async fn test_remove_unnecessary_sort() -> Result<()> {
         let session_ctx = SessionContext::new();
-        let conf = session_ctx.copied_config();
+        let state = session_ctx.state();
         let schema = create_test_schema()?;
         let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?)
             as Arc<dyn ExecutionPlan>;
@@ -589,7 +589,7 @@ mod tests {
             expected, actual
         );
         let optimized_physical_plan =
-            OptimizeSorts::new().optimize(physical_plan, &conf)?;
+            OptimizeSorts::new().optimize(physical_plan, state.config_options())?;
         let formatted = displayable(optimized_physical_plan.as_ref())
             .indent()
             .to_string();
@@ -608,7 +608,7 @@ mod tests {
     #[tokio::test]
     async fn test_remove_unnecessary_sort_window_multilayer() -> Result<()> {
         let session_ctx = SessionContext::new();
-        let conf = session_ctx.copied_config();
+        let state = session_ctx.state();
         let schema = create_test_schema()?;
         let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?)
             as Arc<dyn ExecutionPlan>;
@@ -690,7 +690,7 @@ mod tests {
             expected, actual
         );
         let optimized_physical_plan =
-            OptimizeSorts::new().optimize(physical_plan, &conf)?;
+            OptimizeSorts::new().optimize(physical_plan, state.config_options())?;
         let formatted = displayable(optimized_physical_plan.as_ref())
             .indent()
             .to_string();
@@ -715,7 +715,7 @@ mod tests {
     #[tokio::test]
     async fn test_add_required_sort() -> Result<()> {
         let session_ctx = SessionContext::new();
-        let conf = session_ctx.copied_config();
+        let state = session_ctx.state();
         let schema = create_test_schema()?;
         let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?)
             as Arc<dyn ExecutionPlan>;
@@ -736,7 +736,7 @@ mod tests {
             expected, actual
         );
         let optimized_physical_plan =
-            OptimizeSorts::new().optimize(physical_plan, &conf)?;
+            OptimizeSorts::new().optimize(physical_plan, state.config_options())?;
         let formatted = displayable(optimized_physical_plan.as_ref())
             .indent()
             .to_string();
@@ -760,7 +760,7 @@ mod tests {
     #[tokio::test]
     async fn test_remove_unnecessary_sort1() -> Result<()> {
         let session_ctx = SessionContext::new();
-        let conf = session_ctx.copied_config();
+        let state = session_ctx.state();
         let schema = create_test_schema()?;
         let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?)
             as Arc<dyn ExecutionPlan>;
@@ -803,7 +803,7 @@ mod tests {
             expected, actual
         );
         let optimized_physical_plan =
-            OptimizeSorts::new().optimize(physical_plan, &conf)?;
+            OptimizeSorts::new().optimize(physical_plan, state.config_options())?;
         let formatted = displayable(optimized_physical_plan.as_ref())
             .indent()
             .to_string();
@@ -827,7 +827,7 @@ mod tests {
     #[tokio::test]
     async fn test_change_wrong_sorting() -> Result<()> {
         let session_ctx = SessionContext::new();
-        let conf = session_ctx.copied_config();
+        let state = session_ctx.state();
         let schema = create_test_schema()?;
         let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?)
             as Arc<dyn ExecutionPlan>;
@@ -865,7 +865,7 @@ mod tests {
             expected, actual
         );
         let optimized_physical_plan =
-            OptimizeSorts::new().optimize(physical_plan, &conf)?;
+            OptimizeSorts::new().optimize(physical_plan, state.config_options())?;
         let formatted = displayable(optimized_physical_plan.as_ref())
             .indent()
             .to_string();
diff --git a/datafusion/core/src/physical_optimizer/optimizer.rs b/datafusion/core/src/physical_optimizer/optimizer.rs
index 18cbc139d..26ec137e2 100644
--- a/datafusion/core/src/physical_optimizer/optimizer.rs
+++ b/datafusion/core/src/physical_optimizer/optimizer.rs
@@ -19,9 +19,8 @@
 
 use std::sync::Arc;
 
-use crate::{
-    error::Result, execution::context::SessionConfig, physical_plan::ExecutionPlan,
-};
+use crate::config::ConfigOptions;
+use crate::{error::Result, physical_plan::ExecutionPlan};
 
 /// `PhysicalOptimizerRule` transforms one ['ExecutionPlan'] into another which
 /// computes the same results, but in a potentially more efficient
@@ -31,7 +30,7 @@ pub trait PhysicalOptimizerRule {
     fn optimize(
         &self,
         plan: Arc<dyn ExecutionPlan>,
-        config: &SessionConfig,
+        config: &ConfigOptions,
     ) -> Result<Arc<dyn ExecutionPlan>>;
 
     /// A human readable name for this optimizer rule
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index 2d3f7a0e1..66359ebf6 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -19,11 +19,12 @@
 use std::sync::Arc;
 
 use super::optimizer::PhysicalOptimizerRule;
+use crate::config::{ConfigOptions, OPT_TARGET_PARTITIONS};
+use crate::error::Result;
 use crate::physical_plan::Partitioning::*;
 use crate::physical_plan::{
     repartition::RepartitionExec, with_new_children_if_necessary, ExecutionPlan,
 };
-use crate::{error::Result, execution::context::SessionConfig};
 
 /// Optimizer that introduces repartition to introduce more
 /// parallelism in the plan
@@ -207,14 +208,15 @@ impl PhysicalOptimizerRule for Repartition {
     fn optimize(
         &self,
         plan: Arc<dyn ExecutionPlan>,
-        config: &SessionConfig,
+        config: &ConfigOptions,
     ) -> Result<Arc<dyn ExecutionPlan>> {
+        let target_partitions = config.get_usize(OPT_TARGET_PARTITIONS).unwrap();
         // Don't run optimizer if target_partitions == 1
-        if config.target_partitions() == 1 {
+        if target_partitions == 1 {
             Ok(plan)
         } else {
             optimize_partitions(
-                config.target_partitions(),
+                target_partitions,
                 plan.clone(),
                 plan.output_ordering().is_none(),
                 false,
@@ -360,8 +362,10 @@ mod tests {
         ($EXPECTED_LINES: expr, $PLAN: expr) => {
             let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();
 
+            let mut config = ConfigOptions::new();
+            config.set_usize(OPT_TARGET_PARTITIONS, 10);
+
             // run optimizer
-            let config = SessionConfig::new().with_target_partitions(10);
             let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
                 Arc::new(Repartition::new()),
                 // The `BasicEnforcement` is an essential rule to be applied.
diff --git a/datafusion/core/src/physical_optimizer/utils.rs b/datafusion/core/src/physical_optimizer/utils.rs
index 8f1fe2d08..ae1b58815 100644
--- a/datafusion/core/src/physical_optimizer/utils.rs
+++ b/datafusion/core/src/physical_optimizer/utils.rs
@@ -18,8 +18,8 @@
 //! Collection of utility functions that are leveraged by the query optimizer rules
 
 use super::optimizer::PhysicalOptimizerRule;
-use crate::execution::context::SessionConfig;
 
+use crate::config::ConfigOptions;
 use crate::error::Result;
 use crate::physical_plan::sorts::sort::SortExec;
 use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
@@ -36,12 +36,12 @@ use std::sync::Arc;
 pub fn optimize_children(
     optimizer: &impl PhysicalOptimizerRule,
     plan: Arc<dyn ExecutionPlan>,
-    session_config: &SessionConfig,
+    config: &ConfigOptions,
 ) -> Result<Arc<dyn ExecutionPlan>> {
     let children = plan
         .children()
         .iter()
-        .map(|child| optimizer.optimize(Arc::clone(child), session_config))
+        .map(|child| optimizer.optimize(Arc::clone(child), config))
         .collect::<Result<Vec<_>>>()?;
 
     if children.is_empty() {
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index 768c42978..6aa3f627d 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -1783,7 +1783,7 @@ impl DefaultPhysicalPlanner {
         let mut new_plan = plan;
         for optimizer in optimizers {
             let before_schema = new_plan.schema();
-            new_plan = optimizer.optimize(new_plan, &session_state.config)?;
+            new_plan = optimizer.optimize(new_plan, session_state.config_options())?;
             if optimizer.schema_check() && new_plan.schema() != before_schema {
                 return Err(DataFusionError::Internal(format!(
                         "PhysicalOptimizer rule '{}' failed, due to generate a different schema, original schema: {:?}, new schema: {:?}",