You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2023/01/17 21:38:49 UTC
[arrow-datafusion] branch master updated: SUPPORT SEMI/ANTI JOIN SQL syntax in DataFusion (#4932)
This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 933e5bad2 SUPPORT SEMI/ANTI JOIN SQL syntax in DataFusion (#4932)
933e5bad2 is described below
commit 933e5bad27c70e7e674a1fb11e7b203184ad6b65
Author: mingmwang <mi...@ebay.com>
AuthorDate: Wed Jan 18 05:38:42 2023 +0800
SUPPORT SEMI/ANTI JOIN SQL syntax in DataFusion (#4932)
---
datafusion/core/tests/sql/joins.rs | 112 ++++++++++++++++++++++++++++++++++++
datafusion/sql/src/relation/join.rs | 28 +++++++++
2 files changed, 140 insertions(+)
diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs
index c20c66e10..c73a3ad20 100644
--- a/datafusion/core/tests/sql/joins.rs
+++ b/datafusion/core/tests/sql/joins.rs
@@ -2055,6 +2055,58 @@ async fn left_semi_join() -> Result<()> {
"+-------+",
];
assert_batches_eq!(expected, &actual);
+
+ let sql = "SELECT t1_id, t1_name FROM t1 LEFT SEMI JOIN t2 ON (t1_id = t2_id) ORDER BY t1_id";
+ let msg = format!("Creating logical plan for '{sql}'");
+ let dataframe = ctx.sql(sql).await.expect(&msg);
+ let physical_plan = dataframe.create_physical_plan().await?;
+ let expected = if repartition_joins {
+ vec![
+ "SortExec: [t1_id@0 ASC NULLS LAST]",
+ " CoalescePartitionsExec",
+ " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name]",
+ " CoalesceBatchesExec: target_batch_size=4096",
+ " HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]",
+ " CoalesceBatchesExec: target_batch_size=4096",
+ " RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " MemoryExec: partitions=1, partition_sizes=[1]",
+ " CoalesceBatchesExec: target_batch_size=4096",
+ " RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " MemoryExec: partitions=1, partition_sizes=[1]",
+ ]
+ } else {
+ vec![
+ "SortExec: [t1_id@0 ASC NULLS LAST]",
+ " CoalescePartitionsExec",
+ " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name]",
+ " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " CoalesceBatchesExec: target_batch_size=4096",
+ " HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]",
+ " MemoryExec: partitions=1, partition_sizes=[1]",
+ " MemoryExec: partitions=1, partition_sizes=[1]",
+ ]
+ };
+ let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+ let actual: Vec<&str> = formatted.trim().lines().collect();
+ assert_eq!(
+ expected, actual,
+ "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+ );
+
+ let actual = execute_to_batches(&ctx, sql).await;
+ let expected = vec![
+ "+-------+---------+",
+ "| t1_id | t1_name |",
+ "+-------+---------+",
+ "| 11 | a |",
+ "| 11 | a |",
+ "| 22 | b |",
+ "| 44 | d |",
+ "+-------+---------+",
+ ];
+ assert_batches_eq!(expected, &actual);
}
Ok(())
@@ -2093,6 +2145,18 @@ async fn left_anti_join() -> Result<()> {
"+-------+",
];
assert_batches_eq!(expected, &actual);
+
+ let sql = "SELECT t1_id, t1_name FROM t1 LEFT ANTI JOIN t2 ON (t1_id = t2_id) ORDER BY t1_id";
+ let actual = execute_to_batches(&ctx, sql).await;
+ let expected = vec![
+ "+-------+---------+",
+ "| t1_id | t1_name |",
+ "+-------+---------+",
+ "| 33 | c |",
+ "| | e |",
+ "+-------+---------+",
+ ];
+ assert_batches_eq!(expected, &actual);
}
Ok(())
@@ -2209,6 +2273,54 @@ async fn right_semi_join() -> Result<()> {
"+-------+---------+--------+",
];
assert_batches_eq!(expected, &actual);
+
+ let sql = "SELECT t1_id, t1_name, t1_int FROM t2 RIGHT SEMI JOIN t1 on (t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id";
+ let msg = format!("Creating logical plan for '{sql}'");
+ let dataframe = ctx.sql(sql).await.expect(&msg);
+ let physical_plan = dataframe.create_physical_plan().await?;
+ let expected = if repartition_joins {
+ vec![ "SortExec: [t1_id@0 ASC NULLS LAST]",
+ " CoalescePartitionsExec",
+ " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int]",
+ " CoalesceBatchesExec: target_batch_size=4096",
+ " HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\", index: 0 }, op: NotEq, right: Column { name: \"t1_name\", index: 1 } }",
+ " CoalesceBatchesExec: target_batch_size=4096",
+ " RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " MemoryExec: partitions=1, partition_sizes=[1]",
+ " CoalesceBatchesExec: target_batch_size=4096",
+ " RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " MemoryExec: partitions=1, partition_sizes=[1]",
+ ]
+ } else {
+ vec![
+ "SortExec: [t1_id@0 ASC NULLS LAST]",
+ " CoalescePartitionsExec",
+ " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int]",
+ " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " CoalesceBatchesExec: target_batch_size=4096",
+ " HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\", index: 0 }, op: NotEq, right: Column { name: \"t1_name\", index: 1 } }",
+ " MemoryExec: partitions=1, partition_sizes=[1]",
+ " MemoryExec: partitions=1, partition_sizes=[1]",
+ ]
+ };
+ let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+ let actual: Vec<&str> = formatted.trim().lines().collect();
+ assert_eq!(
+ expected, actual,
+ "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+ );
+
+ let actual = execute_to_batches(&ctx, sql).await;
+ let expected = vec![
+ "+-------+---------+--------+",
+ "| t1_id | t1_name | t1_int |",
+ "+-------+---------+--------+",
+ "| 11 | a | 1 |",
+ "+-------+---------+--------+",
+ ];
+ assert_batches_eq!(expected, &actual);
}
Ok(())
diff --git a/datafusion/sql/src/relation/join.rs b/datafusion/sql/src/relation/join.rs
index da7943eaa..6f2233f39 100644
--- a/datafusion/sql/src/relation/join.rs
+++ b/datafusion/sql/src/relation/join.rs
@@ -76,6 +76,34 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
JoinOperator::Inner(constraint) => {
self.parse_join(left, right, constraint, JoinType::Inner, planner_context)
}
+ JoinOperator::LeftSemi(constraint) => self.parse_join(
+ left,
+ right,
+ constraint,
+ JoinType::LeftSemi,
+ planner_context,
+ ),
+ JoinOperator::RightSemi(constraint) => self.parse_join(
+ left,
+ right,
+ constraint,
+ JoinType::RightSemi,
+ planner_context,
+ ),
+ JoinOperator::LeftAnti(constraint) => self.parse_join(
+ left,
+ right,
+ constraint,
+ JoinType::LeftAnti,
+ planner_context,
+ ),
+ JoinOperator::RightAnti(constraint) => self.parse_join(
+ left,
+ right,
+ constraint,
+ JoinType::RightAnti,
+ planner_context,
+ ),
JoinOperator::FullOuter(constraint) => {
self.parse_join(left, right, constraint, JoinType::Full, planner_context)
}