You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/12/28 11:58:56 UTC
[arrow-datafusion] branch master updated: Decouple physical optimizer from SessionConfig (#3887) (#4749)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new d3ca9b05b Decouple physical optimizer from SessionConfig (#3887) (#4749)
d3ca9b05b is described below
commit d3ca9b05b90368fb5a8696a366d727f2b7358b4e
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Wed Dec 28 11:58:50 2022 +0000
Decouple physical optimizer from SessionConfig (#3887) (#4749)
* Decouple physical optimizer from SessionConfig (#3887)
* Logical conflicts
---
.../src/physical_optimizer/aggregate_statistics.rs | 13 ++++++------
.../src/physical_optimizer/coalesce_batches.rs | 3 ++-
.../core/src/physical_optimizer/enforcement.rs | 16 ++++++++-------
.../physical_optimizer/global_sort_selection.rs | 4 ++--
.../core/src/physical_optimizer/join_selection.rs | 20 ++++++++----------
.../core/src/physical_optimizer/optimize_sorts.rs | 24 +++++++++++-----------
.../core/src/physical_optimizer/optimizer.rs | 7 +++----
.../core/src/physical_optimizer/repartition.rs | 14 ++++++++-----
datafusion/core/src/physical_optimizer/utils.rs | 6 +++---
datafusion/core/src/physical_plan/planner.rs | 2 +-
10 files changed, 57 insertions(+), 52 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
index ff4c2190f..3014f5c54 100644
--- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
+++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
@@ -18,9 +18,9 @@
//! Utilizing exact statistics from sources to avoid scanning data
use std::sync::Arc;
+use crate::config::ConfigOptions;
use datafusion_expr::utils::COUNT_STAR_EXPANSION;
-use crate::execution::context::SessionConfig;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode};
use crate::physical_plan::empty::EmptyExec;
use crate::physical_plan::projection::ProjectionExec;
@@ -51,7 +51,7 @@ impl PhysicalOptimizerRule for AggregateStatistics {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
- config: &SessionConfig,
+ config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
if let Some(partial_agg_exec) = take_optimizable(&*plan) {
let partial_agg_exec = partial_agg_exec
@@ -307,9 +307,10 @@ mod tests {
agg: TestAggregate,
) -> Result<()> {
let session_ctx = SessionContext::new();
- let conf = session_ctx.copied_config();
+ let state = session_ctx.state();
let plan = Arc::new(plan) as _;
- let optimized = AggregateStatistics::new().optimize(Arc::clone(&plan), &conf)?;
+ let optimized = AggregateStatistics::new()
+ .optimize(Arc::clone(&plan), state.config_options())?;
// A ProjectionExec is a sign that the count optimization was applied
assert!(optimized.as_any().is::<ProjectionExec>());
@@ -548,7 +549,7 @@ mod tests {
Arc::clone(&schema),
)?;
- let conf = SessionConfig::new();
+ let conf = ConfigOptions::new();
let optimized =
AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?;
@@ -591,7 +592,7 @@ mod tests {
Arc::clone(&schema),
)?;
- let conf = SessionConfig::new();
+ let conf = ConfigOptions::new();
let optimized =
AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?;
diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
index e0d20be16..40de861ec 100644
--- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs
+++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
@@ -18,6 +18,7 @@
//! CoalesceBatches optimizer that groups batches together rows
//! in bigger batches to avoid overhead with small batches
+use crate::config::ConfigOptions;
use crate::{
error::Result,
physical_optimizer::PhysicalOptimizerRule,
@@ -46,7 +47,7 @@ impl PhysicalOptimizerRule for CoalesceBatches {
fn optimize(
&self,
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
- _config: &crate::execution::context::SessionConfig,
+ _config: &ConfigOptions,
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
let target_batch_size = self.target_batch_size;
plan.transform_up(&|plan| {
diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs b/datafusion/core/src/physical_optimizer/enforcement.rs
index 30a796191..59e7a0190 100644
--- a/datafusion/core/src/physical_optimizer/enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/enforcement.rs
@@ -18,7 +18,9 @@
//! Enforcement optimizer rules are used to make sure the plan's Distribution and Ordering
//! requirements are met by inserting necessary [[RepartitionExec]] and [[SortExec]].
//!
-use crate::config::OPT_TOP_DOWN_JOIN_KEY_REORDERING;
+use crate::config::{
+ ConfigOptions, OPT_TARGET_PARTITIONS, OPT_TOP_DOWN_JOIN_KEY_REORDERING,
+};
use crate::error::Result;
use crate::physical_optimizer::utils::{add_sort_above_child, ordering_satisfy};
use crate::physical_optimizer::PhysicalOptimizerRule;
@@ -34,7 +36,6 @@ use crate::physical_plan::sorts::sort::SortOptions;
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::Partitioning;
use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan};
-use crate::prelude::SessionConfig;
use arrow::datatypes::SchemaRef;
use datafusion_expr::logical_plan::JoinType;
use datafusion_physical_expr::equivalence::EquivalenceProperties;
@@ -69,11 +70,10 @@ impl PhysicalOptimizerRule for BasicEnforcement {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
- config: &SessionConfig,
+ config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
- let target_partitions = config.target_partitions();
+ let target_partitions = config.get_usize(OPT_TARGET_PARTITIONS).unwrap();
let top_down_join_key_reordering = config
- .config_options()
.get_bool(OPT_TOP_DOWN_JOIN_KEY_REORDERING)
.unwrap_or_default();
let new_plan = if top_down_join_key_reordering {
@@ -1135,10 +1135,12 @@ mod tests {
($EXPECTED_LINES: expr, $PLAN: expr) => {
let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();
+ let mut config = ConfigOptions::new();
+ config.set_usize(OPT_TARGET_PARTITIONS, 10);
+
// run optimizer
let optimizer = BasicEnforcement {};
- let optimized = optimizer
- .optimize($PLAN, &SessionConfig::new().with_target_partitions(10))?;
+ let optimized = optimizer.optimize($PLAN, &config)?;
// Now format correctly
let plan = displayable(optimized.as_ref()).indent().to_string();
diff --git a/datafusion/core/src/physical_optimizer/global_sort_selection.rs b/datafusion/core/src/physical_optimizer/global_sort_selection.rs
index a6bb8229c..81b4b59e3 100644
--- a/datafusion/core/src/physical_optimizer/global_sort_selection.rs
+++ b/datafusion/core/src/physical_optimizer/global_sort_selection.rs
@@ -19,13 +19,13 @@
use std::sync::Arc;
+use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::ExecutionPlan;
-use crate::prelude::SessionConfig;
/// Currently for a sort operator, if
/// - there are more than one input partitions
@@ -48,7 +48,7 @@ impl PhysicalOptimizerRule for GlobalSortSelection {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
- _config: &SessionConfig,
+ _config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_up(&|plan| {
Ok(plan
diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs
index 63e7937fe..69e9e0f4d 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -20,8 +20,7 @@ use std::sync::Arc;
use arrow::datatypes::Schema;
-use crate::config::OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD;
-use crate::execution::context::SessionConfig;
+use crate::config::{ConfigOptions, OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD};
use crate::logical_expr::JoinType;
use crate::physical_plan::expressions::Column;
use crate::physical_plan::joins::{
@@ -211,10 +210,9 @@ impl PhysicalOptimizerRule for JoinSelection {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
- session_config: &SessionConfig,
+ config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
- let collect_left_threshold: usize = session_config
- .config_options()
+ let collect_left_threshold: usize = config
.get_u64(OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD)
.unwrap_or_default()
.try_into()
@@ -508,7 +506,7 @@ mod tests {
.unwrap();
let optimized_join = JoinSelection::new()
- .optimize(Arc::new(join), &SessionConfig::new())
+ .optimize(Arc::new(join), &ConfigOptions::new())
.unwrap();
let swapping_projection = optimized_join
@@ -556,7 +554,7 @@ mod tests {
.unwrap();
let optimized_join = JoinSelection::new()
- .optimize(Arc::new(join), &SessionConfig::new())
+ .optimize(Arc::new(join), &ConfigOptions::new())
.unwrap();
let swapping_projection = optimized_join
@@ -609,7 +607,7 @@ mod tests {
let original_schema = join.schema();
let optimized_join = JoinSelection::new()
- .optimize(Arc::new(join), &SessionConfig::new())
+ .optimize(Arc::new(join), &ConfigOptions::new())
.unwrap();
let swapped_join = optimized_join
@@ -638,7 +636,7 @@ mod tests {
$EXPECTED_LINES.iter().map(|s| *s).collect::<Vec<&str>>();
let optimized = JoinSelection::new()
- .optimize(Arc::new($PLAN), &SessionConfig::new())
+ .optimize(Arc::new($PLAN), &ConfigOptions::new())
.unwrap();
let plan = displayable(optimized.as_ref()).indent().to_string();
@@ -725,7 +723,7 @@ mod tests {
.unwrap();
let optimized_join = JoinSelection::new()
- .optimize(Arc::new(join), &SessionConfig::new())
+ .optimize(Arc::new(join), &ConfigOptions::new())
.unwrap();
let swapped_join = optimized_join
@@ -950,7 +948,7 @@ mod tests {
.unwrap();
let optimized_join = JoinSelection::new()
- .optimize(Arc::new(join), &SessionConfig::new())
+ .optimize(Arc::new(join), &ConfigOptions::new())
.unwrap();
if !is_swapped {
diff --git a/datafusion/core/src/physical_optimizer/optimize_sorts.rs b/datafusion/core/src/physical_optimizer/optimize_sorts.rs
index cb421b7b8..0a3be1d5b 100644
--- a/datafusion/core/src/physical_optimizer/optimize_sorts.rs
+++ b/datafusion/core/src/physical_optimizer/optimize_sorts.rs
@@ -25,6 +25,7 @@
//! " SortExec: [non_nullable_col@1 ASC]",
//! in the physical plan. The first sort is unnecessary since its result is overwritten
//! by another SortExec. Therefore, this rule removes it from the physical plan.
+use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::utils::{
add_sort_above_child, ordering_satisfy, ordering_satisfy_concrete,
@@ -34,7 +35,6 @@ use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
-use crate::prelude::SessionConfig;
use arrow::datatypes::SchemaRef;
use datafusion_common::{reverse_sort_options, DataFusionError};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
@@ -122,7 +122,7 @@ impl PhysicalOptimizerRule for OptimizeSorts {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
- _config: &SessionConfig,
+ _config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
// Execute a post-order traversal to adjust input key ordering:
let plan_requirements = PlanWithCorrespondingSort::new(plan);
@@ -557,7 +557,7 @@ mod tests {
#[tokio::test]
async fn test_remove_unnecessary_sort() -> Result<()> {
let session_ctx = SessionContext::new();
- let conf = session_ctx.copied_config();
+ let state = session_ctx.state();
let schema = create_test_schema()?;
let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?)
as Arc<dyn ExecutionPlan>;
@@ -589,7 +589,7 @@ mod tests {
expected, actual
);
let optimized_physical_plan =
- OptimizeSorts::new().optimize(physical_plan, &conf)?;
+ OptimizeSorts::new().optimize(physical_plan, state.config_options())?;
let formatted = displayable(optimized_physical_plan.as_ref())
.indent()
.to_string();
@@ -608,7 +608,7 @@ mod tests {
#[tokio::test]
async fn test_remove_unnecessary_sort_window_multilayer() -> Result<()> {
let session_ctx = SessionContext::new();
- let conf = session_ctx.copied_config();
+ let state = session_ctx.state();
let schema = create_test_schema()?;
let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?)
as Arc<dyn ExecutionPlan>;
@@ -690,7 +690,7 @@ mod tests {
expected, actual
);
let optimized_physical_plan =
- OptimizeSorts::new().optimize(physical_plan, &conf)?;
+ OptimizeSorts::new().optimize(physical_plan, state.config_options())?;
let formatted = displayable(optimized_physical_plan.as_ref())
.indent()
.to_string();
@@ -715,7 +715,7 @@ mod tests {
#[tokio::test]
async fn test_add_required_sort() -> Result<()> {
let session_ctx = SessionContext::new();
- let conf = session_ctx.copied_config();
+ let state = session_ctx.state();
let schema = create_test_schema()?;
let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?)
as Arc<dyn ExecutionPlan>;
@@ -736,7 +736,7 @@ mod tests {
expected, actual
);
let optimized_physical_plan =
- OptimizeSorts::new().optimize(physical_plan, &conf)?;
+ OptimizeSorts::new().optimize(physical_plan, state.config_options())?;
let formatted = displayable(optimized_physical_plan.as_ref())
.indent()
.to_string();
@@ -760,7 +760,7 @@ mod tests {
#[tokio::test]
async fn test_remove_unnecessary_sort1() -> Result<()> {
let session_ctx = SessionContext::new();
- let conf = session_ctx.copied_config();
+ let state = session_ctx.state();
let schema = create_test_schema()?;
let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?)
as Arc<dyn ExecutionPlan>;
@@ -803,7 +803,7 @@ mod tests {
expected, actual
);
let optimized_physical_plan =
- OptimizeSorts::new().optimize(physical_plan, &conf)?;
+ OptimizeSorts::new().optimize(physical_plan, state.config_options())?;
let formatted = displayable(optimized_physical_plan.as_ref())
.indent()
.to_string();
@@ -827,7 +827,7 @@ mod tests {
#[tokio::test]
async fn test_change_wrong_sorting() -> Result<()> {
let session_ctx = SessionContext::new();
- let conf = session_ctx.copied_config();
+ let state = session_ctx.state();
let schema = create_test_schema()?;
let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?)
as Arc<dyn ExecutionPlan>;
@@ -865,7 +865,7 @@ mod tests {
expected, actual
);
let optimized_physical_plan =
- OptimizeSorts::new().optimize(physical_plan, &conf)?;
+ OptimizeSorts::new().optimize(physical_plan, state.config_options())?;
let formatted = displayable(optimized_physical_plan.as_ref())
.indent()
.to_string();
diff --git a/datafusion/core/src/physical_optimizer/optimizer.rs b/datafusion/core/src/physical_optimizer/optimizer.rs
index 18cbc139d..26ec137e2 100644
--- a/datafusion/core/src/physical_optimizer/optimizer.rs
+++ b/datafusion/core/src/physical_optimizer/optimizer.rs
@@ -19,9 +19,8 @@
use std::sync::Arc;
-use crate::{
- error::Result, execution::context::SessionConfig, physical_plan::ExecutionPlan,
-};
+use crate::config::ConfigOptions;
+use crate::{error::Result, physical_plan::ExecutionPlan};
/// `PhysicalOptimizerRule` transforms one ['ExecutionPlan'] into another which
/// computes the same results, but in a potentially more efficient
@@ -31,7 +30,7 @@ pub trait PhysicalOptimizerRule {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
- config: &SessionConfig,
+ config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>>;
/// A human readable name for this optimizer rule
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index 2d3f7a0e1..66359ebf6 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -19,11 +19,12 @@
use std::sync::Arc;
use super::optimizer::PhysicalOptimizerRule;
+use crate::config::{ConfigOptions, OPT_TARGET_PARTITIONS};
+use crate::error::Result;
use crate::physical_plan::Partitioning::*;
use crate::physical_plan::{
repartition::RepartitionExec, with_new_children_if_necessary, ExecutionPlan,
};
-use crate::{error::Result, execution::context::SessionConfig};
/// Optimizer that introduces repartition to introduce more
/// parallelism in the plan
@@ -207,14 +208,15 @@ impl PhysicalOptimizerRule for Repartition {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
- config: &SessionConfig,
+ config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
+ let target_partitions = config.get_usize(OPT_TARGET_PARTITIONS).unwrap();
// Don't run optimizer if target_partitions == 1
- if config.target_partitions() == 1 {
+ if target_partitions == 1 {
Ok(plan)
} else {
optimize_partitions(
- config.target_partitions(),
+ target_partitions,
plan.clone(),
plan.output_ordering().is_none(),
false,
@@ -360,8 +362,10 @@ mod tests {
($EXPECTED_LINES: expr, $PLAN: expr) => {
let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();
+ let mut config = ConfigOptions::new();
+ config.set_usize(OPT_TARGET_PARTITIONS, 10);
+
// run optimizer
- let config = SessionConfig::new().with_target_partitions(10);
let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
Arc::new(Repartition::new()),
// The `BasicEnforcement` is an essential rule to be applied.
diff --git a/datafusion/core/src/physical_optimizer/utils.rs b/datafusion/core/src/physical_optimizer/utils.rs
index 8f1fe2d08..ae1b58815 100644
--- a/datafusion/core/src/physical_optimizer/utils.rs
+++ b/datafusion/core/src/physical_optimizer/utils.rs
@@ -18,8 +18,8 @@
//! Collection of utility functions that are leveraged by the query optimizer rules
use super::optimizer::PhysicalOptimizerRule;
-use crate::execution::context::SessionConfig;
+use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
@@ -36,12 +36,12 @@ use std::sync::Arc;
pub fn optimize_children(
optimizer: &impl PhysicalOptimizerRule,
plan: Arc<dyn ExecutionPlan>,
- session_config: &SessionConfig,
+ config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let children = plan
.children()
.iter()
- .map(|child| optimizer.optimize(Arc::clone(child), session_config))
+ .map(|child| optimizer.optimize(Arc::clone(child), config))
.collect::<Result<Vec<_>>>()?;
if children.is_empty() {
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index 768c42978..6aa3f627d 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -1783,7 +1783,7 @@ impl DefaultPhysicalPlanner {
let mut new_plan = plan;
for optimizer in optimizers {
let before_schema = new_plan.schema();
- new_plan = optimizer.optimize(new_plan, &session_state.config)?;
+ new_plan = optimizer.optimize(new_plan, session_state.config_options())?;
if optimizer.schema_check() && new_plan.schema() != before_schema {
return Err(DataFusionError::Internal(format!(
"PhysicalOptimizer rule '{}' failed, due to generate a different schema, original schema: {:?}, new schema: {:?}",