You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2023/06/02 13:35:35 UTC

[arrow-datafusion] branch extend_collect_join_support updated: Extend support for collectleft mode

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch extend_collect_join_support
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/extend_collect_join_support by this push:
     new f5e54d70f4 Extend support for collectleft mode
f5e54d70f4 is described below

commit f5e54d70f46592893c3181184acd89e0b9c08705
Author: Daniƫl Heres <da...@coralogix.com>
AuthorDate: Fri Jun 2 15:35:26 2023 +0200

    Extend support for collectleft mode
---
 .../core/src/physical_optimizer/join_selection.rs  | 35 ++++++++++++++++------
 1 file changed, 26 insertions(+), 9 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs
index 2589fa625e..9815ce4450 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -28,7 +28,7 @@ use crate::physical_plan::joins::{
     CrossJoinExec, HashJoinExec, PartitionMode,
 };
 use crate::physical_plan::projection::ProjectionExec;
-use crate::physical_plan::{ExecutionPlan, PhysicalExpr};
+use crate::physical_plan::{ExecutionPlan, Partitioning, PhysicalExpr};
 
 use super::optimizer::PhysicalOptimizerRule;
 use crate::error::Result;
@@ -292,25 +292,42 @@ fn try_collect_left(
     let right = hash_join.right();
     let join_type = hash_join.join_type();
 
+    let left_supports_by_size = collect_threshold.map_or(true, |threshold| {
+        supports_collect_by_size(&**left, threshold)
+    });
     let left_can_collect = match join_type {
-        JoinType::Left | JoinType::Full | JoinType::LeftAnti => false,
+        JoinType::Left | JoinType::Full | JoinType::LeftAnti => {
+            left_supports_by_size
+                && matches!(
+                    left.output_partitioning(),
+                    Partitioning::UnknownPartitioning(_)
+                        | Partitioning::RoundRobinBatch(_)
+                )
+        }
         JoinType::Inner
         | JoinType::LeftSemi
         | JoinType::Right
         | JoinType::RightSemi
-        | JoinType::RightAnti => collect_threshold.map_or(true, |threshold| {
-            supports_collect_by_size(&**left, threshold)
-        }),
+        | JoinType::RightAnti => left_supports_by_size,
     };
+    let right_supports_by_size = collect_threshold.map_or(true, |threshold| {
+        supports_collect_by_size(&**right, threshold)
+    });
+
     let right_can_collect = match join_type {
-        JoinType::Right | JoinType::Full | JoinType::RightAnti => false,
+        JoinType::Right | JoinType::Full | JoinType::RightAnti => {
+            left_supports_by_size
+                && matches!(
+                    left.output_partitioning(),
+                    Partitioning::UnknownPartitioning(_)
+                        | Partitioning::RoundRobinBatch(_)
+                )
+        }
         JoinType::Inner
         | JoinType::RightSemi
         | JoinType::Left
         | JoinType::LeftSemi
-        | JoinType::LeftAnti => collect_threshold.map_or(true, |threshold| {
-            supports_collect_by_size(&**right, threshold)
-        }),
+        | JoinType::LeftAnti => right_supports_by_size,
     };
     match (left_can_collect, right_can_collect) {
         (true, true) => {