You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ja...@apache.org on 2023/06/12 17:04:11 UTC
[arrow-datafusion] branch main updated: refactor: encapsulate physical optimizer rules into a struct (#6645)
This is an automated email from the ASF dual-hosted git repository.
jakevin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 2d8a42ecf3 refactor: encapsulate physical optimizer rules into a struct (#6645)
2d8a42ecf3 is described below
commit 2d8a42ecf340125e976bcdd5a69014c53df788d6
Author: Ruihang Xia <wa...@gmail.com>
AuthorDate: Tue Jun 13 01:04:05 2023 +0800
refactor: encapsulate physical optimizer rules into a struct (#6645)
* feat: encapsulate physical optimizer rules into a struct
Signed-off-by: Ruihang Xia <wa...@gmail.com>
* remove redundant code
---------
Signed-off-by: Ruihang Xia <wa...@gmail.com>
Co-authored-by: jakevin <ja...@gmail.com>
---
datafusion/core/src/execution/context.rs | 73 ++-----------------
.../core/src/physical_optimizer/optimizer.rs | 83 ++++++++++++++++++++++
2 files changed, 89 insertions(+), 67 deletions(-)
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 6b81a39691..a4f7ba31fb 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -26,10 +26,7 @@ use crate::{
datasource::{MemTable, ViewTable},
logical_expr::{PlanType, ToStringifiedPlan},
optimizer::optimizer::Optimizer,
- physical_optimizer::{
- aggregate_statistics::AggregateStatistics, join_selection::JoinSelection,
- optimizer::PhysicalOptimizerRule,
- },
+ physical_optimizer::optimizer::{PhysicalOptimizer, PhysicalOptimizerRule},
};
use datafusion_execution::registry::SerializerRegistry;
use datafusion_expr::{
@@ -77,13 +74,9 @@ use datafusion_sql::{
};
use sqlparser::dialect::dialect_from_str;
-use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
-use crate::physical_optimizer::repartition::Repartition;
-
use crate::config::ConfigOptions;
use crate::datasource::physical_plan::{plan_to_csv, plan_to_json, plan_to_parquet};
use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry};
-use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
use crate::physical_plan::planner::DefaultPhysicalPlanner;
use crate::physical_plan::udaf::AggregateUDF;
use crate::physical_plan::udf::ScalarUDF;
@@ -103,10 +96,6 @@ use url::Url;
use crate::catalog::information_schema::{InformationSchemaProvider, INFORMATION_SCHEMA};
use crate::catalog::listing_schema::ListingSchemaProvider;
use crate::datasource::object_store::ObjectStoreUrl;
-use crate::physical_optimizer::global_sort_selection::GlobalSortSelection;
-use crate::physical_optimizer::pipeline_checker::PipelineChecker;
-use crate::physical_optimizer::pipeline_fixer::PipelineFixer;
-use crate::physical_optimizer::sort_enforcement::EnforceSorting;
use datafusion_optimizer::{
analyzer::{Analyzer, AnalyzerRule},
OptimizerConfig,
@@ -116,7 +105,6 @@ use uuid::Uuid;
// backwards compatibility
use crate::execution::options::ArrowReadOptions;
-use crate::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate;
pub use datafusion_execution::config::SessionConfig;
pub use datafusion_execution::TaskContext;
@@ -1331,7 +1319,7 @@ pub struct SessionState {
/// Responsible for optimizing a logical plan
optimizer: Optimizer,
/// Responsible for optimizing a physical execution plan
- physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
+ physical_optimizers: PhysicalOptimizer,
/// Responsible for planning `LogicalPlan`s, and `ExecutionPlan`
query_planner: Arc<dyn QueryPlanner + Send + Sync>,
/// Collection of catalogs containing schemas and ultimately TableProviders
@@ -1425,60 +1413,11 @@ impl SessionState {
);
}
- // We need to take care of the rule ordering. They may influence each other.
- let physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
- Arc::new(AggregateStatistics::new()),
- // Statistics-based join selection will change the Auto mode to a real join implementation,
- // like collect left, or hash join, or future sort merge join, which will influence the
- // EnforceDistribution and EnforceSorting rules as they decide whether to add additional
- // repartitioning and local sorting steps to meet distribution and ordering requirements.
- // Therefore, it should run before EnforceDistribution and EnforceSorting.
- Arc::new(JoinSelection::new()),
- // If the query is processing infinite inputs, the PipelineFixer rule applies the
- // necessary transformations to make the query runnable (if it is not already runnable).
- // If the query can not be made runnable, the rule emits an error with a diagnostic message.
- // Since the transformations it applies may alter output partitioning properties of operators
- // (e.g. by swapping hash join sides), this rule runs before EnforceDistribution.
- Arc::new(PipelineFixer::new()),
- // In order to increase the parallelism, the Repartition rule will change the
- // output partitioning of some operators in the plan tree, which will influence
- // other rules. Therefore, it should run as soon as possible. It is optional because:
- // - It's not used for the distributed engine, Ballista.
- // - It's conflicted with some parts of the EnforceDistribution, since it will
- // introduce additional repartitioning while EnforceDistribution aims to
- // reduce unnecessary repartitioning.
- Arc::new(Repartition::new()),
- // - Currently it will depend on the partition number to decide whether to change the
- // single node sort to parallel local sort and merge. Therefore, GlobalSortSelection
- // should run after the Repartition.
- // - Since it will change the output ordering of some operators, it should run
- // before JoinSelection and EnforceSorting, which may depend on that.
- Arc::new(GlobalSortSelection::new()),
- // The EnforceDistribution rule is for adding essential repartition to satisfy the required
- // distribution. Please make sure that the whole plan tree is determined before this rule.
- Arc::new(EnforceDistribution::new()),
- // The CombinePartialFinalAggregate rule should be applied after the EnforceDistribution rule
- Arc::new(CombinePartialFinalAggregate::new()),
- // The EnforceSorting rule is for adding essential local sorting to satisfy the required
- // ordering. Please make sure that the whole plan tree is determined before this rule.
- // Note that one should always run this rule after running the EnforceDistribution rule
- // as the latter may break local sorting requirements.
- Arc::new(EnforceSorting::new()),
- // The CoalesceBatches rule will not influence the distribution and ordering of the
- // whole plan tree. Therefore, to avoid influencing other rules, it should run last.
- Arc::new(CoalesceBatches::new()),
- // The PipelineChecker rule will reject non-runnable query plans that use
- // pipeline-breaking operators on infinite input(s). The rule generates a
- // diagnostic error message when this happens. It makes no changes to the
- // given query plan; i.e. it only acts as a final gatekeeping rule.
- Arc::new(PipelineChecker::new()),
- ];
-
SessionState {
session_id,
analyzer: Analyzer::new(),
optimizer: Optimizer::new(),
- physical_optimizers,
+ physical_optimizers: PhysicalOptimizer::new(),
query_planner: Arc::new(DefaultQueryPlanner {}),
catalog_list,
scalar_functions: HashMap::new(),
@@ -1613,7 +1552,7 @@ impl SessionState {
mut self,
physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
) -> Self {
- self.physical_optimizers = physical_optimizers;
+ self.physical_optimizers = PhysicalOptimizer::with_rules(physical_optimizers);
self
}
@@ -1640,7 +1579,7 @@ impl SessionState {
mut self,
optimizer_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync>,
) -> Self {
- self.physical_optimizers.push(optimizer_rule);
+ self.physical_optimizers.rules.push(optimizer_rule);
self
}
@@ -1931,7 +1870,7 @@ impl SessionState {
/// Return the physical optimizers
pub fn physical_optimizers(&self) -> &[Arc<dyn PhysicalOptimizerRule + Send + Sync>] {
- &self.physical_optimizers
+ &self.physical_optimizers.rules
}
/// return the configuration options
diff --git a/datafusion/core/src/physical_optimizer/optimizer.rs b/datafusion/core/src/physical_optimizer/optimizer.rs
index 26ec137e2b..b1a2de253a 100644
--- a/datafusion/core/src/physical_optimizer/optimizer.rs
+++ b/datafusion/core/src/physical_optimizer/optimizer.rs
@@ -20,6 +20,16 @@
use std::sync::Arc;
use crate::config::ConfigOptions;
+use crate::physical_optimizer::aggregate_statistics::AggregateStatistics;
+use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
+use crate::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate;
+use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
+use crate::physical_optimizer::global_sort_selection::GlobalSortSelection;
+use crate::physical_optimizer::join_selection::JoinSelection;
+use crate::physical_optimizer::pipeline_checker::PipelineChecker;
+use crate::physical_optimizer::pipeline_fixer::PipelineFixer;
+use crate::physical_optimizer::repartition::Repartition;
+use crate::physical_optimizer::sort_enforcement::EnforceSorting;
use crate::{error::Result, physical_plan::ExecutionPlan};
/// `PhysicalOptimizerRule` transforms one ['ExecutionPlan'] into another which
@@ -42,3 +52,76 @@ pub trait PhysicalOptimizerRule {
/// and should disable the schema check.
fn schema_check(&self) -> bool;
}
+
+/// A rule-based physical optimizer.
+#[derive(Clone)]
+pub struct PhysicalOptimizer {
+ /// All rules to apply
+ pub rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
+}
+
+impl Default for PhysicalOptimizer {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl PhysicalOptimizer {
+ /// Create a new optimizer using the recommended list of rules
+ pub fn new() -> Self {
+ let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
+ Arc::new(AggregateStatistics::new()),
+ // Statistics-based join selection will change the Auto mode to a real join implementation,
+ // like collect left, or hash join, or future sort merge join, which will influence the
+ // EnforceDistribution and EnforceSorting rules as they decide whether to add additional
+ // repartitioning and local sorting steps to meet distribution and ordering requirements.
+ // Therefore, it should run before EnforceDistribution and EnforceSorting.
+ Arc::new(JoinSelection::new()),
+ // If the query is processing infinite inputs, the PipelineFixer rule applies the
+ // necessary transformations to make the query runnable (if it is not already runnable).
+ // If the query can not be made runnable, the rule emits an error with a diagnostic message.
+ // Since the transformations it applies may alter output partitioning properties of operators
+ // (e.g. by swapping hash join sides), this rule runs before EnforceDistribution.
+ Arc::new(PipelineFixer::new()),
+ // In order to increase the parallelism, the Repartition rule will change the
+ // output partitioning of some operators in the plan tree, which will influence
+ // other rules. Therefore, it should run as soon as possible. It is optional because:
+ // - It's not used for the distributed engine, Ballista.
+ // - It's conflicted with some parts of the EnforceDistribution, since it will
+ // introduce additional repartitioning while EnforceDistribution aims to
+ // reduce unnecessary repartitioning.
+ Arc::new(Repartition::new()),
+ // - Currently it will depend on the partition number to decide whether to change the
+ // single node sort to parallel local sort and merge. Therefore, GlobalSortSelection
+ // should run after the Repartition.
+ // - Since it will change the output ordering of some operators, it should run
+ // before JoinSelection and EnforceSorting, which may depend on that.
+ Arc::new(GlobalSortSelection::new()),
+ // The EnforceDistribution rule is for adding essential repartition to satisfy the required
+ // distribution. Please make sure that the whole plan tree is determined before this rule.
+ Arc::new(EnforceDistribution::new()),
+ // The CombinePartialFinalAggregate rule should be applied after the EnforceDistribution rule
+ Arc::new(CombinePartialFinalAggregate::new()),
+ // The EnforceSorting rule is for adding essential local sorting to satisfy the required
+ // ordering. Please make sure that the whole plan tree is determined before this rule.
+ // Note that one should always run this rule after running the EnforceDistribution rule
+ // as the latter may break local sorting requirements.
+ Arc::new(EnforceSorting::new()),
+ // The CoalesceBatches rule will not influence the distribution and ordering of the
+ // whole plan tree. Therefore, to avoid influencing other rules, it should run last.
+ Arc::new(CoalesceBatches::new()),
+ // The PipelineChecker rule will reject non-runnable query plans that use
+ // pipeline-breaking operators on infinite input(s). The rule generates a
+ // diagnostic error message when this happens. It makes no changes to the
+ // given query plan; i.e. it only acts as a final gatekeeping rule.
+ Arc::new(PipelineChecker::new()),
+ ];
+
+ Self::with_rules(rules)
+ }
+
+ /// Create a new optimizer with the given rules
+ pub fn with_rules(rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>) -> Self {
+ Self { rules }
+ }
+}