You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/04/06 21:00:00 UTC

[arrow-datafusion] branch main updated: Moving PipelineFixer above all rules to use ExecutionPlan APIs (#5880)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 4f40070b0c Moving PipelineFixer above all rules to use ExecutionPlan APIs  (#5880)
4f40070b0c is described below

commit 4f40070b0c76cc0618c402f435e0e69e1ba2a715
Author: Metehan Yıldırım <10...@users.noreply.github.com>
AuthorDate: Thu Apr 6 23:59:53 2023 +0300

    Moving PipelineFixer above all rules to use ExecutionPlan APIs  (#5880)
    
    * Increase optimizer performance
    
    * Config added.
    
    * Simplifications and comment improvements
    
    * More simplifications
    
    * Revamping tests for unbounded-unbounded cases.
    
    * Review code
    
    * Move SHJ suitability from PipelineFixer to PipelineChecker, further SHJ code simplifications
    
    * Make streaming executors concurrent with optimizer re-order
    
    * Update symmetric_hash_join.rs
    
    * Fix target partitions into 8
    
    * Update symmetric_hash_join.rs
    
    * Update symmetric_hash_join.rs
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
 datafusion/core/src/execution/context.rs           |  24 ++---
 .../src/physical_plan/joins/symmetric_hash_join.rs | 101 +++++++++++++--------
 2 files changed, 77 insertions(+), 48 deletions(-)

diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 003d640c29..8a5ca3d023 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -1290,6 +1290,18 @@ impl SessionState {
         // We need to take care of the rule ordering. They may influence each other.
         let physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
             Arc::new(AggregateStatistics::new()),
+            // Statistics-based join selection will change the Auto mode to a real join implementation,
+            // like collect left, or hash join, or future sort merge join, which will influence the
+            // EnforceDistribution and EnforceSorting rules as they decide whether to add additional
+            // repartitioning and local sorting steps to meet distribution and ordering requirements.
+            // Therefore, it should run before EnforceDistribution and EnforceSorting.
+            Arc::new(JoinSelection::new()),
+            // If the query is processing infinite inputs, the PipelineFixer rule applies the
+            // necessary transformations to make the query runnable (if it is not already runnable).
+            // If the query can not be made runnable, the rule emits an error with a diagnostic message.
+            // Since the transformations it applies may alter output partitioning properties of operators
+            // (e.g. by swapping hash join sides), this rule runs before EnforceDistribution.
+            Arc::new(PipelineFixer::new()),
             // In order to increase the parallelism, the Repartition rule will change the
             // output partitioning of some operators in the plan tree, which will influence
             // other rules. Therefore, it should run as soon as possible. It is optional because:
@@ -1304,18 +1316,6 @@ impl SessionState {
             // - Since it will change the output ordering of some operators, it should run
             // before JoinSelection and EnforceSorting, which may depend on that.
             Arc::new(GlobalSortSelection::new()),
-            // Statistics-based join selection will change the Auto mode to a real join implementation,
-            // like collect left, or hash join, or future sort merge join, which will influence the
-            // EnforceDistribution and EnforceSorting rules as they decide whether to add additional
-            // repartitioning and local sorting steps to meet distribution and ordering requirements.
-            // Therefore, it should run before EnforceDistribution and EnforceSorting.
-            Arc::new(JoinSelection::new()),
-            // If the query is processing infinite inputs, the PipelineFixer rule applies the
-            // necessary transformations to make the query runnable (if it is not already runnable).
-            // If the query can not be made runnable, the rule emits an error with a diagnostic message.
-            // Since the transformations it applies may alter output partitioning properties of operators
-            // (e.g. by swapping hash join sides), this rule runs before EnforceDistribution.
-            Arc::new(PipelineFixer::new()),
             // The EnforceDistribution rule is for adding essential repartition to satisfy the required
             // distribution. Please make sure that the whole plan tree is determined before this rule.
             Arc::new(EnforceDistribution::new()),
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index c6214e8986..465d89f100 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -380,6 +380,10 @@ impl ExecutionPlan for SymmetricHashJoinExec {
         Ok(children.iter().any(|u| *u))
     }
 
+    fn benefits_from_input_partitioning(&self) -> bool {
+        false
+    }
+
     fn required_input_distribution(&self) -> Vec<Distribution> {
         let (left_expr, right_expr) = self
             .on
@@ -388,16 +392,8 @@ impl ExecutionPlan for SymmetricHashJoinExec {
             .unzip();
         // TODO:  This will change when we extend collected executions.
         vec![
-            if self.left.output_partitioning().partition_count() == 1 {
-                Distribution::SinglePartition
-            } else {
-                Distribution::HashPartitioned(left_expr)
-            },
-            if self.right.output_partitioning().partition_count() == 1 {
-                Distribution::SinglePartition
-            } else {
-                Distribution::HashPartitioned(right_expr)
-            },
+            Distribution::HashPartitioned(left_expr),
+            Distribution::HashPartitioned(right_expr),
         ]
     }
 
@@ -1509,7 +1505,7 @@ mod tests {
         hash_join_utils::tests::complicated_filter, HashJoinExec, PartitionMode,
     };
     use crate::physical_plan::{
-        collect, common, memory::MemoryExec, repartition::RepartitionExec,
+        common, displayable, memory::MemoryExec, repartition::RepartitionExec,
     };
     use crate::prelude::{CsvReadOptions, SessionConfig, SessionContext};
     use crate::test_util::register_unbounded_file_with_ordering;
@@ -2182,9 +2178,9 @@ mod tests {
         Ok(())
     }
 
-    #[tokio::test(flavor = "multi_thread")]
+    #[tokio::test]
     async fn join_change_in_planner() -> Result<()> {
-        let config = SessionConfig::new().with_target_partitions(1);
+        let config = SessionConfig::new().with_target_partitions(8);
         let ctx = SessionContext::with_config(config);
         let tmp_dir = TempDir::new().unwrap();
         let left_file_path = tmp_dir.path().join("left.csv");
@@ -2225,21 +2221,37 @@ mod tests {
             true,
         )
         .await?;
-        let df = ctx.sql("EXPLAIN SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10").await?;
-        let physical_plan = df.create_physical_plan().await?;
-        let task_ctx = ctx.task_ctx();
-        let results = collect(physical_plan.clone(), task_ctx).await.unwrap();
-        let formatted = pretty_format_batches(&results).unwrap().to_string();
-        let found = formatted
-            .lines()
-            .any(|line| line.contains("SymmetricHashJoinExec"));
-        assert!(found);
+        let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10";
+        let dataframe = ctx.sql(sql).await?;
+        let physical_plan = dataframe.create_physical_plan().await?;
+        let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+        let expected = {
+            [
+                "SymmetricHashJoinExec: join_type=Full, on=[(Column { name: \"a2\", index: 1 }, Column { name: \"a2\", index: 1 })], filter=BinaryExpr { left: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 0 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Gt, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 1 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Plus, right: Literal { value: Int64(3) } } },  [...]
+                "  CoalesceBatchesExec: target_batch_size=8192",
+                "    RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=1",
+                // "   CsvExec: files={1 group: [[tempdir/left.csv]]}, has_header=false, limit=None, projection=[a1, a2]",
+                "  CoalesceBatchesExec: target_batch_size=8192",
+                "    RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=1",
+                // "   CsvExec: files={1 group: [[tempdir/right.csv]]}, has_header=false, limit=None, projection=[a1, a2]"
+            ]
+        };
+        let mut actual: Vec<&str> = formatted.trim().lines().collect();
+        // Remove CSV lines
+        actual.remove(3);
+        actual.remove(5);
+
+        assert_eq!(
+            expected,
+            actual[..],
+            "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+        );
         Ok(())
     }
 
-    #[tokio::test(flavor = "multi_thread")]
+    #[tokio::test]
     async fn join_change_in_planner_without_sort() -> Result<()> {
-        let config = SessionConfig::new().with_target_partitions(1);
+        let config = SessionConfig::new().with_target_partitions(8);
         let ctx = SessionContext::with_config(config);
         let tmp_dir = TempDir::new()?;
         let left_file_path = tmp_dir.path().join("left.csv");
@@ -2262,22 +2274,39 @@ mod tests {
             CsvReadOptions::new().schema(&schema).mark_infinite(true),
         )
         .await?;
-        let df = ctx.sql("EXPLAIN SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10").await?;
-        let physical_plan = df.create_physical_plan().await?;
-        let task_ctx = ctx.task_ctx();
-        let results = collect(physical_plan.clone(), task_ctx).await?;
-        let formatted = pretty_format_batches(&results)?.to_string();
-        let found = formatted
-            .lines()
-            .any(|line| line.contains("SymmetricHashJoinExec"));
-        assert!(found);
+        let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10";
+        let dataframe = ctx.sql(sql).await?;
+        let physical_plan = dataframe.create_physical_plan().await?;
+        let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+        let expected = {
+            [
+                "SymmetricHashJoinExec: join_type=Full, on=[(Column { name: \"a2\", index: 1 }, Column { name: \"a2\", index: 1 })], filter=BinaryExpr { left: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 0 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Gt, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 1 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Plus, right: Literal { value: Int64(3) } } },  [...]
+                "  CoalesceBatchesExec: target_batch_size=8192",
+                "    RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=1",
+                // "   CsvExec: files={1 group: [[tempdir/left.csv]]}, has_header=false, limit=None, projection=[a1, a2]",
+                "  CoalesceBatchesExec: target_batch_size=8192",
+                "    RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=1",
+                // "   CsvExec: files={1 group: [[tempdir/right.csv]]}, has_header=false, limit=None, projection=[a1, a2]"
+            ]
+        };
+        let mut actual: Vec<&str> = formatted.trim().lines().collect();
+        // Remove CSV lines
+        actual.remove(3);
+        actual.remove(5);
+
+        assert_eq!(
+            expected,
+            actual[..],
+            "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+        );
         Ok(())
     }
 
-    #[tokio::test(flavor = "multi_thread")]
+    #[tokio::test]
     async fn join_change_in_planner_without_sort_not_allowed() -> Result<()> {
-        let config =
-            SessionConfig::new().with_allow_symmetric_joins_without_pruning(false);
+        let config = SessionConfig::new()
+            .with_target_partitions(8)
+            .with_allow_symmetric_joins_without_pruning(false);
         let ctx = SessionContext::with_config(config);
         let tmp_dir = TempDir::new()?;
         let left_file_path = tmp_dir.path().join("left.csv");