You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/04/06 21:00:00 UTC
[arrow-datafusion] branch main updated: Moving PipelineFixer above all rules to use ExecutionPlan APIs (#5880)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 4f40070b0c Moving PipelineFixer above all rules to use ExecutionPlan APIs (#5880)
4f40070b0c is described below
commit 4f40070b0c76cc0618c402f435e0e69e1ba2a715
Author: Metehan Yıldırım <10...@users.noreply.github.com>
AuthorDate: Thu Apr 6 23:59:53 2023 +0300
Moving PipelineFixer above all rules to use ExecutionPlan APIs (#5880)
* Increase optimizer performance
* Config added.
* Simplifications and comment improvements
* More simplifications
* Revamping tests for unbounded-unbounded cases.
* Review code
* Move SHJ suitability from PipelineFixer to PipelineChecker, further SHJ code simplifications
* Make streaming executors concurrent with optimizer re-order
* Update symmetric_hash_join.rs
* Fix target partitions into 8
* Update symmetric_hash_join.rs
* Update symmetric_hash_join.rs
---------
Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
datafusion/core/src/execution/context.rs | 24 ++---
.../src/physical_plan/joins/symmetric_hash_join.rs | 101 +++++++++++++--------
2 files changed, 77 insertions(+), 48 deletions(-)
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 003d640c29..8a5ca3d023 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -1290,6 +1290,18 @@ impl SessionState {
// We need to take care of the rule ordering. They may influence each other.
let physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
Arc::new(AggregateStatistics::new()),
+ // Statistics-based join selection will change the Auto mode to a real join implementation,
+ // like collect left, or hash join, or future sort merge join, which will influence the
+ // EnforceDistribution and EnforceSorting rules as they decide whether to add additional
+ // repartitioning and local sorting steps to meet distribution and ordering requirements.
+ // Therefore, it should run before EnforceDistribution and EnforceSorting.
+ Arc::new(JoinSelection::new()),
+ // If the query is processing infinite inputs, the PipelineFixer rule applies the
+ // necessary transformations to make the query runnable (if it is not already runnable).
+ // If the query can not be made runnable, the rule emits an error with a diagnostic message.
+ // Since the transformations it applies may alter output partitioning properties of operators
+ // (e.g. by swapping hash join sides), this rule runs before EnforceDistribution.
+ Arc::new(PipelineFixer::new()),
// In order to increase the parallelism, the Repartition rule will change the
// output partitioning of some operators in the plan tree, which will influence
// other rules. Therefore, it should run as soon as possible. It is optional because:
@@ -1304,18 +1316,6 @@ impl SessionState {
// - Since it will change the output ordering of some operators, it should run
// before JoinSelection and EnforceSorting, which may depend on that.
Arc::new(GlobalSortSelection::new()),
- // Statistics-based join selection will change the Auto mode to a real join implementation,
- // like collect left, or hash join, or future sort merge join, which will influence the
- // EnforceDistribution and EnforceSorting rules as they decide whether to add additional
- // repartitioning and local sorting steps to meet distribution and ordering requirements.
- // Therefore, it should run before EnforceDistribution and EnforceSorting.
- Arc::new(JoinSelection::new()),
- // If the query is processing infinite inputs, the PipelineFixer rule applies the
- // necessary transformations to make the query runnable (if it is not already runnable).
- // If the query can not be made runnable, the rule emits an error with a diagnostic message.
- // Since the transformations it applies may alter output partitioning properties of operators
- // (e.g. by swapping hash join sides), this rule runs before EnforceDistribution.
- Arc::new(PipelineFixer::new()),
// The EnforceDistribution rule is for adding essential repartition to satisfy the required
// distribution. Please make sure that the whole plan tree is determined before this rule.
Arc::new(EnforceDistribution::new()),
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index c6214e8986..465d89f100 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -380,6 +380,10 @@ impl ExecutionPlan for SymmetricHashJoinExec {
Ok(children.iter().any(|u| *u))
}
+ fn benefits_from_input_partitioning(&self) -> bool {
+ false
+ }
+
fn required_input_distribution(&self) -> Vec<Distribution> {
let (left_expr, right_expr) = self
.on
@@ -388,16 +392,8 @@ impl ExecutionPlan for SymmetricHashJoinExec {
.unzip();
// TODO: This will change when we extend collected executions.
vec![
- if self.left.output_partitioning().partition_count() == 1 {
- Distribution::SinglePartition
- } else {
- Distribution::HashPartitioned(left_expr)
- },
- if self.right.output_partitioning().partition_count() == 1 {
- Distribution::SinglePartition
- } else {
- Distribution::HashPartitioned(right_expr)
- },
+ Distribution::HashPartitioned(left_expr),
+ Distribution::HashPartitioned(right_expr),
]
}
@@ -1509,7 +1505,7 @@ mod tests {
hash_join_utils::tests::complicated_filter, HashJoinExec, PartitionMode,
};
use crate::physical_plan::{
- collect, common, memory::MemoryExec, repartition::RepartitionExec,
+ common, displayable, memory::MemoryExec, repartition::RepartitionExec,
};
use crate::prelude::{CsvReadOptions, SessionConfig, SessionContext};
use crate::test_util::register_unbounded_file_with_ordering;
@@ -2182,9 +2178,9 @@ mod tests {
Ok(())
}
- #[tokio::test(flavor = "multi_thread")]
+ #[tokio::test]
async fn join_change_in_planner() -> Result<()> {
- let config = SessionConfig::new().with_target_partitions(1);
+ let config = SessionConfig::new().with_target_partitions(8);
let ctx = SessionContext::with_config(config);
let tmp_dir = TempDir::new().unwrap();
let left_file_path = tmp_dir.path().join("left.csv");
@@ -2225,21 +2221,37 @@ mod tests {
true,
)
.await?;
- let df = ctx.sql("EXPLAIN SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10").await?;
- let physical_plan = df.create_physical_plan().await?;
- let task_ctx = ctx.task_ctx();
- let results = collect(physical_plan.clone(), task_ctx).await.unwrap();
- let formatted = pretty_format_batches(&results).unwrap().to_string();
- let found = formatted
- .lines()
- .any(|line| line.contains("SymmetricHashJoinExec"));
- assert!(found);
+ let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10";
+ let dataframe = ctx.sql(sql).await?;
+ let physical_plan = dataframe.create_physical_plan().await?;
+ let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+ let expected = {
+ [
+ "SymmetricHashJoinExec: join_type=Full, on=[(Column { name: \"a2\", index: 1 }, Column { name: \"a2\", index: 1 })], filter=BinaryExpr { left: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 0 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Gt, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 1 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Plus, right: Literal { value: Int64(3) } } }, [...]
+ " CoalesceBatchesExec: target_batch_size=8192",
+ " RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=1",
+ // " CsvExec: files={1 group: [[tempdir/left.csv]]}, has_header=false, limit=None, projection=[a1, a2]",
+ " CoalesceBatchesExec: target_batch_size=8192",
+ " RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=1",
+ // " CsvExec: files={1 group: [[tempdir/right.csv]]}, has_header=false, limit=None, projection=[a1, a2]"
+ ]
+ };
+ let mut actual: Vec<&str> = formatted.trim().lines().collect();
+ // Remove CSV lines
+ actual.remove(3);
+ actual.remove(5);
+
+ assert_eq!(
+ expected,
+ actual[..],
+ "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+ );
Ok(())
}
- #[tokio::test(flavor = "multi_thread")]
+ #[tokio::test]
async fn join_change_in_planner_without_sort() -> Result<()> {
- let config = SessionConfig::new().with_target_partitions(1);
+ let config = SessionConfig::new().with_target_partitions(8);
let ctx = SessionContext::with_config(config);
let tmp_dir = TempDir::new()?;
let left_file_path = tmp_dir.path().join("left.csv");
@@ -2262,22 +2274,39 @@ mod tests {
CsvReadOptions::new().schema(&schema).mark_infinite(true),
)
.await?;
- let df = ctx.sql("EXPLAIN SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10").await?;
- let physical_plan = df.create_physical_plan().await?;
- let task_ctx = ctx.task_ctx();
- let results = collect(physical_plan.clone(), task_ctx).await?;
- let formatted = pretty_format_batches(&results)?.to_string();
- let found = formatted
- .lines()
- .any(|line| line.contains("SymmetricHashJoinExec"));
- assert!(found);
+ let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10";
+ let dataframe = ctx.sql(sql).await?;
+ let physical_plan = dataframe.create_physical_plan().await?;
+ let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+ let expected = {
+ [
+ "SymmetricHashJoinExec: join_type=Full, on=[(Column { name: \"a2\", index: 1 }, Column { name: \"a2\", index: 1 })], filter=BinaryExpr { left: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 0 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Gt, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 1 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Plus, right: Literal { value: Int64(3) } } }, [...]
+ " CoalesceBatchesExec: target_batch_size=8192",
+ " RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=1",
+ // " CsvExec: files={1 group: [[tempdir/left.csv]]}, has_header=false, limit=None, projection=[a1, a2]",
+ " CoalesceBatchesExec: target_batch_size=8192",
+ " RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=1",
+ // " CsvExec: files={1 group: [[tempdir/right.csv]]}, has_header=false, limit=None, projection=[a1, a2]"
+ ]
+ };
+ let mut actual: Vec<&str> = formatted.trim().lines().collect();
+ // Remove CSV lines
+ actual.remove(3);
+ actual.remove(5);
+
+ assert_eq!(
+ expected,
+ actual[..],
+ "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+ );
Ok(())
}
- #[tokio::test(flavor = "multi_thread")]
+ #[tokio::test]
async fn join_change_in_planner_without_sort_not_allowed() -> Result<()> {
- let config =
- SessionConfig::new().with_allow_symmetric_joins_without_pruning(false);
+ let config = SessionConfig::new()
+ .with_target_partitions(8)
+ .with_allow_symmetric_joins_without_pruning(false);
let ctx = SessionContext::with_config(config);
let tmp_dir = TempDir::new()?;
let left_file_path = tmp_dir.path().join("left.csv");