You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ja...@apache.org on 2023/06/12 17:04:11 UTC

[arrow-datafusion] branch main updated: refactor: encapsulate physical optimizer rules into a struct (#6645)

This is an automated email from the ASF dual-hosted git repository.

jakevin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 2d8a42ecf3 refactor: encapsulate physical optimizer rules into a struct (#6645)
2d8a42ecf3 is described below

commit 2d8a42ecf340125e976bcdd5a69014c53df788d6
Author: Ruihang Xia <wa...@gmail.com>
AuthorDate: Tue Jun 13 01:04:05 2023 +0800

    refactor: encapsulate physical optimizer rules into a struct (#6645)
    
    * feat: encapsulate physical optimizer rules into a struct
    
    Signed-off-by: Ruihang Xia <wa...@gmail.com>
    
    * remove redundant code
    
    ---------
    
    Signed-off-by: Ruihang Xia <wa...@gmail.com>
    Co-authored-by: jakevin <ja...@gmail.com>
---
 datafusion/core/src/execution/context.rs           | 73 ++-----------------
 .../core/src/physical_optimizer/optimizer.rs       | 83 ++++++++++++++++++++++
 2 files changed, 89 insertions(+), 67 deletions(-)

diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 6b81a39691..a4f7ba31fb 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -26,10 +26,7 @@ use crate::{
     datasource::{MemTable, ViewTable},
     logical_expr::{PlanType, ToStringifiedPlan},
     optimizer::optimizer::Optimizer,
-    physical_optimizer::{
-        aggregate_statistics::AggregateStatistics, join_selection::JoinSelection,
-        optimizer::PhysicalOptimizerRule,
-    },
+    physical_optimizer::optimizer::{PhysicalOptimizer, PhysicalOptimizerRule},
 };
 use datafusion_execution::registry::SerializerRegistry;
 use datafusion_expr::{
@@ -77,13 +74,9 @@ use datafusion_sql::{
 };
 use sqlparser::dialect::dialect_from_str;
 
-use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
-use crate::physical_optimizer::repartition::Repartition;
-
 use crate::config::ConfigOptions;
 use crate::datasource::physical_plan::{plan_to_csv, plan_to_json, plan_to_parquet};
 use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry};
-use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
 use crate::physical_plan::planner::DefaultPhysicalPlanner;
 use crate::physical_plan::udaf::AggregateUDF;
 use crate::physical_plan::udf::ScalarUDF;
@@ -103,10 +96,6 @@ use url::Url;
 use crate::catalog::information_schema::{InformationSchemaProvider, INFORMATION_SCHEMA};
 use crate::catalog::listing_schema::ListingSchemaProvider;
 use crate::datasource::object_store::ObjectStoreUrl;
-use crate::physical_optimizer::global_sort_selection::GlobalSortSelection;
-use crate::physical_optimizer::pipeline_checker::PipelineChecker;
-use crate::physical_optimizer::pipeline_fixer::PipelineFixer;
-use crate::physical_optimizer::sort_enforcement::EnforceSorting;
 use datafusion_optimizer::{
     analyzer::{Analyzer, AnalyzerRule},
     OptimizerConfig,
@@ -116,7 +105,6 @@ use uuid::Uuid;
 
 // backwards compatibility
 use crate::execution::options::ArrowReadOptions;
-use crate::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate;
 pub use datafusion_execution::config::SessionConfig;
 pub use datafusion_execution::TaskContext;
 
@@ -1331,7 +1319,7 @@ pub struct SessionState {
     /// Responsible for optimizing a logical plan
     optimizer: Optimizer,
     /// Responsible for optimizing a physical execution plan
-    physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
+    physical_optimizers: PhysicalOptimizer,
     /// Responsible for planning `LogicalPlan`s, and `ExecutionPlan`
     query_planner: Arc<dyn QueryPlanner + Send + Sync>,
     /// Collection of catalogs containing schemas and ultimately TableProviders
@@ -1425,60 +1413,11 @@ impl SessionState {
             );
         }
 
-        // We need to take care of the rule ordering. They may influence each other.
-        let physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
-            Arc::new(AggregateStatistics::new()),
-            // Statistics-based join selection will change the Auto mode to a real join implementation,
-            // like collect left, or hash join, or future sort merge join, which will influence the
-            // EnforceDistribution and EnforceSorting rules as they decide whether to add additional
-            // repartitioning and local sorting steps to meet distribution and ordering requirements.
-            // Therefore, it should run before EnforceDistribution and EnforceSorting.
-            Arc::new(JoinSelection::new()),
-            // If the query is processing infinite inputs, the PipelineFixer rule applies the
-            // necessary transformations to make the query runnable (if it is not already runnable).
-            // If the query can not be made runnable, the rule emits an error with a diagnostic message.
-            // Since the transformations it applies may alter output partitioning properties of operators
-            // (e.g. by swapping hash join sides), this rule runs before EnforceDistribution.
-            Arc::new(PipelineFixer::new()),
-            // In order to increase the parallelism, the Repartition rule will change the
-            // output partitioning of some operators in the plan tree, which will influence
-            // other rules. Therefore, it should run as soon as possible. It is optional because:
-            // - It's not used for the distributed engine, Ballista.
-            // - It's conflicted with some parts of the EnforceDistribution, since it will
-            //   introduce additional repartitioning while EnforceDistribution aims to
-            //   reduce unnecessary repartitioning.
-            Arc::new(Repartition::new()),
-            // - Currently it will depend on the partition number to decide whether to change the
-            // single node sort to parallel local sort and merge. Therefore, GlobalSortSelection
-            // should run after the Repartition.
-            // - Since it will change the output ordering of some operators, it should run
-            // before JoinSelection and EnforceSorting, which may depend on that.
-            Arc::new(GlobalSortSelection::new()),
-            // The EnforceDistribution rule is for adding essential repartition to satisfy the required
-            // distribution. Please make sure that the whole plan tree is determined before this rule.
-            Arc::new(EnforceDistribution::new()),
-            // The CombinePartialFinalAggregate rule should be applied after the EnforceDistribution rule
-            Arc::new(CombinePartialFinalAggregate::new()),
-            // The EnforceSorting rule is for adding essential local sorting to satisfy the required
-            // ordering. Please make sure that the whole plan tree is determined before this rule.
-            // Note that one should always run this rule after running the EnforceDistribution rule
-            // as the latter may break local sorting requirements.
-            Arc::new(EnforceSorting::new()),
-            // The CoalesceBatches rule will not influence the distribution and ordering of the
-            // whole plan tree. Therefore, to avoid influencing other rules, it should run last.
-            Arc::new(CoalesceBatches::new()),
-            // The PipelineChecker rule will reject non-runnable query plans that use
-            // pipeline-breaking operators on infinite input(s). The rule generates a
-            // diagnostic error message when this happens. It makes no changes to the
-            // given query plan; i.e. it only acts as a final gatekeeping rule.
-            Arc::new(PipelineChecker::new()),
-        ];
-
         SessionState {
             session_id,
             analyzer: Analyzer::new(),
             optimizer: Optimizer::new(),
-            physical_optimizers,
+            physical_optimizers: PhysicalOptimizer::new(),
             query_planner: Arc::new(DefaultQueryPlanner {}),
             catalog_list,
             scalar_functions: HashMap::new(),
@@ -1613,7 +1552,7 @@ impl SessionState {
         mut self,
         physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
     ) -> Self {
-        self.physical_optimizers = physical_optimizers;
+        self.physical_optimizers = PhysicalOptimizer::with_rules(physical_optimizers);
         self
     }
 
@@ -1640,7 +1579,7 @@ impl SessionState {
         mut self,
         optimizer_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync>,
     ) -> Self {
-        self.physical_optimizers.push(optimizer_rule);
+        self.physical_optimizers.rules.push(optimizer_rule);
         self
     }
 
@@ -1931,7 +1870,7 @@ impl SessionState {
 
     /// Return the physical optimizers
     pub fn physical_optimizers(&self) -> &[Arc<dyn PhysicalOptimizerRule + Send + Sync>] {
-        &self.physical_optimizers
+        &self.physical_optimizers.rules
     }
 
     /// return the configuration options
diff --git a/datafusion/core/src/physical_optimizer/optimizer.rs b/datafusion/core/src/physical_optimizer/optimizer.rs
index 26ec137e2b..b1a2de253a 100644
--- a/datafusion/core/src/physical_optimizer/optimizer.rs
+++ b/datafusion/core/src/physical_optimizer/optimizer.rs
@@ -20,6 +20,16 @@
 use std::sync::Arc;
 
 use crate::config::ConfigOptions;
+use crate::physical_optimizer::aggregate_statistics::AggregateStatistics;
+use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
+use crate::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate;
+use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
+use crate::physical_optimizer::global_sort_selection::GlobalSortSelection;
+use crate::physical_optimizer::join_selection::JoinSelection;
+use crate::physical_optimizer::pipeline_checker::PipelineChecker;
+use crate::physical_optimizer::pipeline_fixer::PipelineFixer;
+use crate::physical_optimizer::repartition::Repartition;
+use crate::physical_optimizer::sort_enforcement::EnforceSorting;
 use crate::{error::Result, physical_plan::ExecutionPlan};
 
 /// `PhysicalOptimizerRule` transforms one ['ExecutionPlan'] into another which
@@ -42,3 +52,76 @@ pub trait PhysicalOptimizerRule {
     /// and should disable the schema check.
     fn schema_check(&self) -> bool;
 }
+
+/// A rule-based physical optimizer.
+#[derive(Clone)]
+pub struct PhysicalOptimizer {
+    /// All rules to apply
+    pub rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
+}
+
+impl Default for PhysicalOptimizer {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl PhysicalOptimizer {
+    /// Create a new optimizer using the recommended list of rules
+    pub fn new() -> Self {
+        let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
+            Arc::new(AggregateStatistics::new()),
+            // Statistics-based join selection will change the Auto mode to a real join implementation,
+            // like collect left, or hash join, or future sort merge join, which will influence the
+            // EnforceDistribution and EnforceSorting rules as they decide whether to add additional
+            // repartitioning and local sorting steps to meet distribution and ordering requirements.
+            // Therefore, it should run before EnforceDistribution and EnforceSorting.
+            Arc::new(JoinSelection::new()),
+            // If the query is processing infinite inputs, the PipelineFixer rule applies the
+            // necessary transformations to make the query runnable (if it is not already runnable).
+            // If the query can not be made runnable, the rule emits an error with a diagnostic message.
+            // Since the transformations it applies may alter output partitioning properties of operators
+            // (e.g. by swapping hash join sides), this rule runs before EnforceDistribution.
+            Arc::new(PipelineFixer::new()),
+            // In order to increase the parallelism, the Repartition rule will change the
+            // output partitioning of some operators in the plan tree, which will influence
+            // other rules. Therefore, it should run as soon as possible. It is optional because:
+            // - It's not used for the distributed engine, Ballista.
+            // - It's conflicted with some parts of the EnforceDistribution, since it will
+            //   introduce additional repartitioning while EnforceDistribution aims to
+            //   reduce unnecessary repartitioning.
+            Arc::new(Repartition::new()),
+            // - Currently it will depend on the partition number to decide whether to change the
+            // single node sort to parallel local sort and merge. Therefore, GlobalSortSelection
+            // should run after the Repartition.
+            // - Since it will change the output ordering of some operators, it should run
+            // before JoinSelection and EnforceSorting, which may depend on that.
+            Arc::new(GlobalSortSelection::new()),
+            // The EnforceDistribution rule is for adding essential repartition to satisfy the required
+            // distribution. Please make sure that the whole plan tree is determined before this rule.
+            Arc::new(EnforceDistribution::new()),
+            // The CombinePartialFinalAggregate rule should be applied after the EnforceDistribution rule
+            Arc::new(CombinePartialFinalAggregate::new()),
+            // The EnforceSorting rule is for adding essential local sorting to satisfy the required
+            // ordering. Please make sure that the whole plan tree is determined before this rule.
+            // Note that one should always run this rule after running the EnforceDistribution rule
+            // as the latter may break local sorting requirements.
+            Arc::new(EnforceSorting::new()),
+            // The CoalesceBatches rule will not influence the distribution and ordering of the
+            // whole plan tree. Therefore, to avoid influencing other rules, it should run last.
+            Arc::new(CoalesceBatches::new()),
+            // The PipelineChecker rule will reject non-runnable query plans that use
+            // pipeline-breaking operators on infinite input(s). The rule generates a
+            // diagnostic error message when this happens. It makes no changes to the
+            // given query plan; i.e. it only acts as a final gatekeeping rule.
+            Arc::new(PipelineChecker::new()),
+        ];
+
+        Self::with_rules(rules)
+    }
+
+    /// Create a new optimizer with the given rules
+    pub fn with_rules(rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>) -> Self {
+        Self { rules }
+    }
+}