You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2023/01/17 21:38:49 UTC

[arrow-datafusion] branch master updated: SUPPORT SEMI/ANTI JOIN SQL syntax in DataFusion (#4932)

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 933e5bad2 SUPPORT SEMI/ANTI JOIN SQL syntax in DataFusion (#4932)
933e5bad2 is described below

commit 933e5bad27c70e7e674a1fb11e7b203184ad6b65
Author: mingmwang <mi...@ebay.com>
AuthorDate: Wed Jan 18 05:38:42 2023 +0800

    SUPPORT SEMI/ANTI JOIN SQL syntax in DataFusion (#4932)
---
 datafusion/core/tests/sql/joins.rs  | 112 ++++++++++++++++++++++++++++++++++++
 datafusion/sql/src/relation/join.rs |  28 +++++++++
 2 files changed, 140 insertions(+)

diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs
index c20c66e10..c73a3ad20 100644
--- a/datafusion/core/tests/sql/joins.rs
+++ b/datafusion/core/tests/sql/joins.rs
@@ -2055,6 +2055,58 @@ async fn left_semi_join() -> Result<()> {
             "+-------+",
         ];
         assert_batches_eq!(expected, &actual);
+
+        let sql = "SELECT t1_id, t1_name FROM t1 LEFT SEMI JOIN t2 ON (t1_id = t2_id) ORDER BY t1_id";
+        let msg = format!("Creating logical plan for '{sql}'");
+        let dataframe = ctx.sql(sql).await.expect(&msg);
+        let physical_plan = dataframe.create_physical_plan().await?;
+        let expected = if repartition_joins {
+            vec![
+                "SortExec: [t1_id@0 ASC NULLS LAST]",
+                "  CoalescePartitionsExec",
+                "    ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name]",
+                "      CoalesceBatchesExec: target_batch_size=4096",
+                "        HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]",
+                "          CoalesceBatchesExec: target_batch_size=4096",
+                "            RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2)",
+                "              RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "                MemoryExec: partitions=1, partition_sizes=[1]",
+                "          CoalesceBatchesExec: target_batch_size=4096",
+                "            RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2)",
+                "              RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "                MemoryExec: partitions=1, partition_sizes=[1]",
+            ]
+        } else {
+            vec![
+                "SortExec: [t1_id@0 ASC NULLS LAST]",
+                "  CoalescePartitionsExec",
+                "    ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name]",
+                "      RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "        CoalesceBatchesExec: target_batch_size=4096",
+                "          HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]",
+                "            MemoryExec: partitions=1, partition_sizes=[1]",
+                "            MemoryExec: partitions=1, partition_sizes=[1]",
+            ]
+        };
+        let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+        let actual: Vec<&str> = formatted.trim().lines().collect();
+        assert_eq!(
+            expected, actual,
+            "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+        );
+
+        let actual = execute_to_batches(&ctx, sql).await;
+        let expected = vec![
+            "+-------+---------+",
+            "| t1_id | t1_name |",
+            "+-------+---------+",
+            "| 11    | a       |",
+            "| 11    | a       |",
+            "| 22    | b       |",
+            "| 44    | d       |",
+            "+-------+---------+",
+        ];
+        assert_batches_eq!(expected, &actual);
     }
 
     Ok(())
@@ -2093,6 +2145,18 @@ async fn left_anti_join() -> Result<()> {
             "+-------+",
         ];
         assert_batches_eq!(expected, &actual);
+
+        let sql = "SELECT t1_id, t1_name FROM t1 LEFT ANTI JOIN t2 ON (t1_id = t2_id) ORDER BY t1_id";
+        let actual = execute_to_batches(&ctx, sql).await;
+        let expected = vec![
+            "+-------+---------+",
+            "| t1_id | t1_name |",
+            "+-------+---------+",
+            "| 33    | c       |",
+            "|       | e       |",
+            "+-------+---------+",
+        ];
+        assert_batches_eq!(expected, &actual);
     }
 
     Ok(())
@@ -2209,6 +2273,54 @@ async fn right_semi_join() -> Result<()> {
             "+-------+---------+--------+",
         ];
         assert_batches_eq!(expected, &actual);
+
+        let sql = "SELECT t1_id, t1_name, t1_int FROM t2 RIGHT SEMI JOIN t1 on (t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id";
+        let msg = format!("Creating logical plan for '{sql}'");
+        let dataframe = ctx.sql(sql).await.expect(&msg);
+        let physical_plan = dataframe.create_physical_plan().await?;
+        let expected = if repartition_joins {
+            vec![ "SortExec: [t1_id@0 ASC NULLS LAST]",
+                  "  CoalescePartitionsExec",
+                  "    ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int]",
+                  "      CoalesceBatchesExec: target_batch_size=4096",
+                  "        HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\", index: 0 }, op: NotEq, right: Column { name: \"t1_name\", index: 1 } }",
+                  "          CoalesceBatchesExec: target_batch_size=4096",
+                  "            RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2)",
+                  "              RepartitionExec: partitioning=RoundRobinBatch(2)",
+                  "                MemoryExec: partitions=1, partition_sizes=[1]",
+                  "          CoalesceBatchesExec: target_batch_size=4096",
+                  "            RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2)",
+                  "              RepartitionExec: partitioning=RoundRobinBatch(2)",
+                  "                MemoryExec: partitions=1, partition_sizes=[1]",
+            ]
+        } else {
+            vec![
+                "SortExec: [t1_id@0 ASC NULLS LAST]",
+                "  CoalescePartitionsExec",
+                "    ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int]",
+                "      RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "        CoalesceBatchesExec: target_batch_size=4096",
+                "          HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\", index: 0 }, op: NotEq, right: Column { name: \"t1_name\", index: 1 } }",
+                "            MemoryExec: partitions=1, partition_sizes=[1]",
+                "            MemoryExec: partitions=1, partition_sizes=[1]",
+            ]
+        };
+        let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+        let actual: Vec<&str> = formatted.trim().lines().collect();
+        assert_eq!(
+            expected, actual,
+            "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+        );
+
+        let actual = execute_to_batches(&ctx, sql).await;
+        let expected = vec![
+            "+-------+---------+--------+",
+            "| t1_id | t1_name | t1_int |",
+            "+-------+---------+--------+",
+            "| 11    | a       | 1      |",
+            "+-------+---------+--------+",
+        ];
+        assert_batches_eq!(expected, &actual);
     }
 
     Ok(())
diff --git a/datafusion/sql/src/relation/join.rs b/datafusion/sql/src/relation/join.rs
index da7943eaa..6f2233f39 100644
--- a/datafusion/sql/src/relation/join.rs
+++ b/datafusion/sql/src/relation/join.rs
@@ -76,6 +76,34 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             JoinOperator::Inner(constraint) => {
                 self.parse_join(left, right, constraint, JoinType::Inner, planner_context)
             }
+            JoinOperator::LeftSemi(constraint) => self.parse_join(
+                left,
+                right,
+                constraint,
+                JoinType::LeftSemi,
+                planner_context,
+            ),
+            JoinOperator::RightSemi(constraint) => self.parse_join(
+                left,
+                right,
+                constraint,
+                JoinType::RightSemi,
+                planner_context,
+            ),
+            JoinOperator::LeftAnti(constraint) => self.parse_join(
+                left,
+                right,
+                constraint,
+                JoinType::LeftAnti,
+                planner_context,
+            ),
+            JoinOperator::RightAnti(constraint) => self.parse_join(
+                left,
+                right,
+                constraint,
+                JoinType::RightAnti,
+                planner_context,
+            ),
             JoinOperator::FullOuter(constraint) => {
                 self.parse_join(left, right, constraint, JoinType::Full, planner_context)
             }