You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "metesynnada (via GitHub)" <gi...@apache.org> on 2023/04/05 12:03:35 UTC

[GitHub] [arrow-datafusion] metesynnada opened a new pull request, #5880: Moving PipelineFixer above all rules to use ExecutionPlan APIs

metesynnada opened a new pull request, #5880:
URL: https://github.com/apache/arrow-datafusion/pull/5880

   # Which issue does this PR close?
   
   Closes #5878.
   
   # Rationale for this change
   Since some strongly dependent optimizer rules affect each other, rule ordering becomes more important. PipelineFixer (maybe more rules in the future) can change the ExecutionPlan at a level, and the new `ExecutionPlan` can have a different set of flags (maybe ordering, distribution, or more). 
   
   I suggest making the executor changer rules above the rules that fill the sort, distribution, etc.
   
   - Before PR
   ```
   [
       "SymmetricHashJoinExec: join_type=Full, on=[(Column { name: \"a2\", index: 1 }, Column { name: \"a2\", index: 1 })], filter=BinaryExpr { left: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 0 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Gt, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 1 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Plus, right: Literal { value: Int64(3) } } }, op: And, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 0 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Lt, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 1 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Plus, right: Literal { value: Int64(10) } } } }",
       "  CoalesceBatchesExec: target_batch_size=8192",
       "    RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=8",
       "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
       "        CsvExec: files={1 group: [[private/var/folders/rf/dhj0s83d57l2_m51k2dmd_ch0000gn/T/.tmpcRbDJD/left.csv]]}, has_header=false, limit=None, projection=[a1, a2]",
       "  CoalesceBatchesExec: target_batch_size=8192",
       "    RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=8",
       "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
       "        CsvExec: files={1 group: [[private/var/folders/rf/dhj0s83d57l2_m51k2dmd_ch0000gn/T/.tmpcRbDJD/right.csv]]}, has_header=false, limit=None, projection=[a1, a2]",
   ]
   ```
   SHJ particularly sets `benefits_from_input_partitioning` to `false`, however, it is ineffective since `RepartitionExec::RounRobin` is added before `HashJoin -> SymmetricHashJoin` change.
   
   - After PR
   ```
   [
       "SymmetricHashJoinExec: join_type=Full, on=[(Column { name: \"a2\", index: 1 }, Column { name: \"a2\", index: 1 })], filter=BinaryExpr { left: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 0 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Gt, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 1 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Plus, right: Literal { value: Int64(3) } } }, op: And, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 0 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Lt, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 1 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Plus, right: Literal { value: Int64(10) } } } }",
       "  CoalesceBatchesExec: target_batch_size=8192",
       "    RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=1",
       "      CsvExec: files={1 group: [[private/var/folders/rf/dhj0s83d57l2_m51k2dmd_ch0000gn/T/.tmpdTwdrk/left.csv]]}, has_header=false, limit=None, projection=[a1, a2]",
       "  CoalesceBatchesExec: target_batch_size=8192",
       "    RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=1",
       "      CsvExec: files={1 group: [[private/var/folders/rf/dhj0s83d57l2_m51k2dmd_ch0000gn/T/.tmpdTwdrk/right.csv]]}, has_header=false, limit=None, projection=[a1, a2]",
   ]
   ```
   
   
   # What changes are included in this PR?
   
   Optimizer reorders to use the `benefits_from_input_partitioning` API. If we call PipelineFixer above (almost) everything, we can leverage the changed executor APIs in the optimizer.
   
   # Are these changes tested?
   
   Yes.
   
   # Are there any user-facing changes?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5880: Moving PipelineFixer above all rules to use ExecutionPlan APIs

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #5880:
URL: https://github.com/apache/arrow-datafusion/pull/5880#discussion_r1159638486


##########
datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs:
##########
@@ -2225,21 +2221,39 @@ mod tests {
             true,
         )
         .await?;
-        let df = ctx.sql("EXPLAIN SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10").await?;
-        let physical_plan = df.create_physical_plan().await?;
-        let task_ctx = ctx.task_ctx();
-        let results = collect(physical_plan.clone(), task_ctx).await.unwrap();
-        let formatted = pretty_format_batches(&results).unwrap().to_string();
-        let found = formatted
-            .lines()
-            .any(|line| line.contains("SymmetricHashJoinExec"));
-        assert!(found);
+        let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10";
+        let dataframe = ctx.sql(sql).await?;
+        let physical_plan = dataframe.create_physical_plan().await?;
+        let formatted = crate::physical_plan::displayable(physical_plan.as_ref())

Review Comment:
   Maybe you can import `crate::physical_plan` above and use `displayable` directly here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5880: Moving PipelineFixer above all rules to use ExecutionPlan APIs

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #5880:
URL: https://github.com/apache/arrow-datafusion/pull/5880#discussion_r1159635631


##########
datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs:
##########
@@ -2225,21 +2221,39 @@ mod tests {
             true,
         )
         .await?;
-        let df = ctx.sql("EXPLAIN SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10").await?;
-        let physical_plan = df.create_physical_plan().await?;
-        let task_ctx = ctx.task_ctx();
-        let results = collect(physical_plan.clone(), task_ctx).await.unwrap();
-        let formatted = pretty_format_batches(&results).unwrap().to_string();
-        let found = formatted
-            .lines()
-            .any(|line| line.contains("SymmetricHashJoinExec"));
-        assert!(found);
+        let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10";
+        let dataframe = ctx.sql(sql).await?;
+        let physical_plan = dataframe.create_physical_plan().await?;
+        let formatted = crate::physical_plan::displayable(physical_plan.as_ref())
+            .indent()
+            .to_string();
+        let expected = {
+            [
+                "SymmetricHashJoinExec: join_type=Full, on=[(Column { name: \"a2\", index: 1 }, Column { name: \"a2\", index: 1 })], filter=BinaryExpr { left: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 0 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Gt, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 1 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Plus, right: Literal { value: Int64(3) } } }, op: And, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 0 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Lt, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 1 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Plus, right: Literal { value: Int64(10) } } } }",
+                "  CoalesceBatchesExec: target_batch_size=8192",
+                "    RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=1",
+                // "   CsvExec: files={1 group: [[tempdir/left.csv]]}, has_header=false, limit=None, projection=[a1, a2]",
+                "  CoalesceBatchesExec: target_batch_size=8192",
+                "    RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=1",
+                // "   CsvExec: files={1 group: [[tempdir/right.csv]]}, has_header=false, limit=None, projection=[a1, a2]"
+            ]
+        };
+        let mut actual: Vec<&str> = formatted.trim().lines().collect();
+        // Remove CSV lines
+        actual.remove(3);
+        actual.remove(5);
+
+        assert_eq!(
+            expected,
+            actual[..],
+            "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+        );
         Ok(())
     }
 
     #[tokio::test(flavor = "multi_thread")]
     async fn join_change_in_planner_without_sort() -> Result<()> {

Review Comment:
   I guess these tests check for physical plan generation. Since we are not executing anything, I guess `flavor = "multi_thread"` is unnecessary. You can write the tests  `join_change_in_planner_without_sort_not_allowed` and `join_change_in_planner_without_sort`  without `flavor = "multi_thread"` as in `join_change_in_planner`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #5880: Moving PipelineFixer above all rules to use ExecutionPlan APIs

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb merged PR #5880:
URL: https://github.com/apache/arrow-datafusion/pull/5880


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org