You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2023/06/02 13:35:35 UTC
[arrow-datafusion] branch extend_collect_join_support updated: Extend support for collectleft mode
This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch extend_collect_join_support
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/extend_collect_join_support by this push:
new f5e54d70f4 Extend support for collectleft mode
f5e54d70f4 is described below
commit f5e54d70f46592893c3181184acd89e0b9c08705
Author: Daniƫl Heres <da...@coralogix.com>
AuthorDate: Fri Jun 2 15:35:26 2023 +0200
Extend support for collectleft mode
---
.../core/src/physical_optimizer/join_selection.rs | 35 ++++++++++++++++------
1 file changed, 26 insertions(+), 9 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs
index 2589fa625e..9815ce4450 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -28,7 +28,7 @@ use crate::physical_plan::joins::{
CrossJoinExec, HashJoinExec, PartitionMode,
};
use crate::physical_plan::projection::ProjectionExec;
-use crate::physical_plan::{ExecutionPlan, PhysicalExpr};
+use crate::physical_plan::{ExecutionPlan, Partitioning, PhysicalExpr};
use super::optimizer::PhysicalOptimizerRule;
use crate::error::Result;
@@ -292,25 +292,42 @@ fn try_collect_left(
let right = hash_join.right();
let join_type = hash_join.join_type();
+ let left_supports_by_size = collect_threshold.map_or(true, |threshold| {
+ supports_collect_by_size(&**left, threshold)
+ });
let left_can_collect = match join_type {
- JoinType::Left | JoinType::Full | JoinType::LeftAnti => false,
+ JoinType::Left | JoinType::Full | JoinType::LeftAnti => {
+ left_supports_by_size
+ && matches!(
+ left.output_partitioning(),
+ Partitioning::UnknownPartitioning(_)
+ | Partitioning::RoundRobinBatch(_)
+ )
+ }
JoinType::Inner
| JoinType::LeftSemi
| JoinType::Right
| JoinType::RightSemi
- | JoinType::RightAnti => collect_threshold.map_or(true, |threshold| {
- supports_collect_by_size(&**left, threshold)
- }),
+ | JoinType::RightAnti => left_supports_by_size,
};
+ let right_supports_by_size = collect_threshold.map_or(true, |threshold| {
+ supports_collect_by_size(&**right, threshold)
+ });
+
let right_can_collect = match join_type {
- JoinType::Right | JoinType::Full | JoinType::RightAnti => false,
+ JoinType::Right | JoinType::Full | JoinType::RightAnti => {
+ left_supports_by_size
+ && matches!(
+ left.output_partitioning(),
+ Partitioning::UnknownPartitioning(_)
+ | Partitioning::RoundRobinBatch(_)
+ )
+ }
JoinType::Inner
| JoinType::RightSemi
| JoinType::Left
| JoinType::LeftSemi
- | JoinType::LeftAnti => collect_threshold.map_or(true, |threshold| {
- supports_collect_by_size(&**right, threshold)
- }),
+ | JoinType::LeftAnti => right_supports_by_size,
};
match (left_can_collect, right_can_collect) {
(true, true) => {