You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/06/13 00:55:17 UTC
[arrow-datafusion] branch main updated: Port tests in joins.rs to sqllogictes (#6642)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 3fc009b41e Port tests in joins.rs to sqllogictes (#6642)
3fc009b41e is described below
commit 3fc009b41e392d50b67cea9dddcc6e7014b3b78b
Author: zhenxing jiang <ji...@gmail.com>
AuthorDate: Tue Jun 13 08:55:10 2023 +0800
Port tests in joins.rs to sqllogictes (#6642)
* port left_join_using_2,timestamp_join to sqllogictest
* move remiand joins testcase to sqllogicatest
* clippy fix
---
datafusion/core/tests/sql/joins.rs | 625 ---------------------
datafusion/core/tests/sql/mod.rs | 289 ----------
datafusion/core/tests/sqllogictests/src/main.rs | 11 +
datafusion/core/tests/sqllogictests/src/setup.rs | 318 ++++++++++-
.../core/tests/sqllogictests/test_files/joins.slt | 600 ++++++++++++++++++++
5 files changed, 928 insertions(+), 915 deletions(-)
diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs
index 054db2c3f7..116a3c1a79 100644
--- a/datafusion/core/tests/sql/joins.rs
+++ b/datafusion/core/tests/sql/joins.rs
@@ -37,144 +37,6 @@ async fn nestedjoin_with_alias() -> Result<()> {
Ok(())
}
-#[tokio::test]
-async fn join_timestamp() -> Result<()> {
- let ctx = SessionContext::new();
- ctx.register_table("t", table_with_timestamps()).unwrap();
-
- let expected = vec![
- "+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+",
- "| nanos | micros | millis | secs | name | nanos | micros | millis | secs | name |",
- "+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+",
- "| 2011-12-13T11:13:10.123450 | 2011-12-13T11:13:10.123450 | 2011-12-13T11:13:10.123 | 2011-12-13T11:13:10 | Row 1 | 2011-12-13T11:13:10.123450 | 2011-12-13T11:13:10.123450 | 2011-12-13T11:13:10.123 | 2011-12-13T11:13:10 | Row 1 |",
- "| 2018-11-13T17:11:10.011375885 | 2018-11-13T17:11:10.011375 | 2018-11-13T17:11:10.011 | 2018-11-13T17:11:10 | Row 0 | 2018-11-13T17:11:10.011375885 | 2018-11-13T17:11:10.011375 | 2018-11-13T17:11:10.011 | 2018-11-13T17:11:10 | Row 0 |",
- "| 2021-01-01T05:11:10.432 | 2021-01-01T05:11:10.432 | 2021-01-01T05:11:10.432 | 2021-01-01T05:11:10 | Row 3 | 2021-01-01T05:11:10.432 | 2021-01-01T05:11:10.432 | 2021-01-01T05:11:10.432 | 2021-01-01T05:11:10 | Row 3 |",
- "+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+",
- ];
-
- let results = execute_to_batches(
- &ctx,
- "SELECT * FROM t as t1 \
- JOIN (SELECT * FROM t) as t2 \
- ON t1.nanos = t2.nanos",
- )
- .await;
-
- assert_batches_sorted_eq!(expected, &results);
-
- let results = execute_to_batches(
- &ctx,
- "SELECT * FROM t as t1 \
- JOIN (SELECT * FROM t) as t2 \
- ON t1.micros = t2.micros",
- )
- .await;
-
- assert_batches_sorted_eq!(expected, &results);
-
- let results = execute_to_batches(
- &ctx,
- "SELECT * FROM t as t1 \
- JOIN (SELECT * FROM t) as t2 \
- ON t1.millis = t2.millis",
- )
- .await;
-
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn left_join_using_2() -> Result<()> {
- let results = execute_with_partition(
- "SELECT t1.c1, t2.c2 FROM test t1 JOIN test t2 USING (c2) ORDER BY t2.c2",
- 1,
- )
- .await?;
- assert_eq!(results.len(), 1);
-
- let expected = vec![
- "+----+----+",
- "| c1 | c2 |",
- "+----+----+",
- "| 0 | 1 |",
- "| 0 | 2 |",
- "| 0 | 3 |",
- "| 0 | 4 |",
- "| 0 | 5 |",
- "| 0 | 6 |",
- "| 0 | 7 |",
- "| 0 | 8 |",
- "| 0 | 9 |",
- "| 0 | 10 |",
- "+----+----+",
- ];
-
- assert_batches_eq!(expected, &results);
- Ok(())
-}
-
-#[tokio::test]
-async fn left_join_using_join_key_projection() -> Result<()> {
- let results = execute_with_partition(
- "SELECT t1.c1, t1.c2, t2.c2 FROM test t1 JOIN test t2 USING (c2) ORDER BY t2.c2",
- 1,
- )
- .await?;
- assert_eq!(results.len(), 1);
-
- let expected = vec![
- "+----+----+----+",
- "| c1 | c2 | c2 |",
- "+----+----+----+",
- "| 0 | 1 | 1 |",
- "| 0 | 2 | 2 |",
- "| 0 | 3 | 3 |",
- "| 0 | 4 | 4 |",
- "| 0 | 5 | 5 |",
- "| 0 | 6 | 6 |",
- "| 0 | 7 | 7 |",
- "| 0 | 8 | 8 |",
- "| 0 | 9 | 9 |",
- "| 0 | 10 | 10 |",
- "+----+----+----+",
- ];
-
- assert_batches_eq!(expected, &results);
- Ok(())
-}
-
-#[tokio::test]
-async fn left_join_2() -> Result<()> {
- let results = execute_with_partition(
- "SELECT t1.c1, t1.c2, t2.c2 FROM test t1 JOIN test t2 ON t1.c2 = t2.c2 ORDER BY t1.c2",
- 1,
- )
- .await?;
- assert_eq!(results.len(), 1);
-
- let expected = vec![
- "+----+----+----+",
- "| c1 | c2 | c2 |",
- "+----+----+----+",
- "| 0 | 1 | 1 |",
- "| 0 | 2 | 2 |",
- "| 0 | 3 | 3 |",
- "| 0 | 4 | 4 |",
- "| 0 | 5 | 5 |",
- "| 0 | 6 | 6 |",
- "| 0 | 7 | 7 |",
- "| 0 | 8 | 8 |",
- "| 0 | 9 | 9 |",
- "| 0 | 10 | 10 |",
- "+----+----+----+",
- ];
-
- assert_batches_eq!(expected, &results);
- Ok(())
-}
-
#[tokio::test]
async fn join_partitioned() -> Result<()> {
// self join on partition id (workaround for duplicate column name)
@@ -192,388 +54,6 @@ async fn join_partitioned() -> Result<()> {
Ok(())
}
-#[tokio::test]
-async fn hash_join_with_date32() -> Result<()> {
- let ctx = create_hashjoin_datatype_context()?;
-
- // inner join on hash supported data type (Date32)
- let sql = "select * from t1 join t2 on t1.c1 = t2.c1";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
- let expected = vec![
- "Explain [plan_type:Utf8, plan:Utf8]",
- " Inner Join: t1.c1 = t2.c1 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]",
- " TableScan: t1 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N]",
- " TableScan: t2 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- let expected = vec![
- "+------------+---------------------+---------+-----+------------+---------------------+---------+-----+",
- "| c1 | c2 | c3 | c4 | c1 | c2 | c3 | c4 |",
- "+------------+---------------------+---------+-----+------------+---------------------+---------+-----+",
- "| 1970-01-02 | 1970-01-02T00:00:00 | 1.23 | abc | 1970-01-02 | 1970-01-02T00:00:00 | -123.12 | abc |",
- "| 1970-01-04 | | -123.12 | jkl | 1970-01-04 | | 789.00 | |",
- "+------------+---------------------+---------+-----+------------+---------------------+---------+-----+",
- ];
-
- let results = execute_to_batches(&ctx, sql).await;
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn hash_join_with_date64() -> Result<()> {
- let ctx = create_hashjoin_datatype_context()?;
-
- // left join on hash supported data type (Date64)
- let sql = "select * from t1 left join t2 on t1.c2 = t2.c2";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
- let expected = vec![
- "Explain [plan_type:Utf8, plan:Utf8]",
- " Left Join: t1.c2 = t2.c2 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]",
- " TableScan: t1 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N]",
- " TableScan: t2 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- let expected = vec![
- "+------------+---------------------+---------+-----+------------+---------------------+---------+--------+",
- "| c1 | c2 | c3 | c4 | c1 | c2 | c3 | c4 |",
- "+------------+---------------------+---------+-----+------------+---------------------+---------+--------+",
- "| | 1970-01-04T00:00:00 | 789.00 | ghi | | 1970-01-04T00:00:00 | 0.00 | qwerty |",
- "| 1970-01-02 | 1970-01-02T00:00:00 | 1.23 | abc | 1970-01-02 | 1970-01-02T00:00:00 | -123.12 | abc |",
- "| 1970-01-03 | 1970-01-03T00:00:00 | 456.00 | def | | | | |",
- "| 1970-01-04 | | -123.12 | jkl | | | | |",
- "+------------+---------------------+---------+-----+------------+---------------------+---------+--------+",
- ];
-
- let results = execute_to_batches(&ctx, sql).await;
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn hash_join_with_decimal() -> Result<()> {
- let ctx = create_hashjoin_datatype_context()?;
-
- // right join on hash supported data type (Decimal)
- let sql = "select * from t1 right join t2 on t1.c3 = t2.c3";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
- let expected = vec![
- "Explain [plan_type:Utf8, plan:Utf8]",
- " Right Join: CAST(t1.c3 AS Decimal128(10, 2)) = t2.c3 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]",
- " TableScan: t1 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N]",
- " TableScan: t2 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- let expected = vec![
- "+------------+---------------------+---------+-----+------------+---------------------+-----------+---------+",
- "| c1 | c2 | c3 | c4 | c1 | c2 | c3 | c4 |",
- "+------------+---------------------+---------+-----+------------+---------------------+-----------+---------+",
- "| | | | | | | 100000.00 | abcdefg |",
- "| | | | | | 1970-01-04T00:00:00 | 0.00 | qwerty |",
- "| | 1970-01-04T00:00:00 | 789.00 | ghi | 1970-01-04 | | 789.00 | |",
- "| 1970-01-04 | | -123.12 | jkl | 1970-01-02 | 1970-01-02T00:00:00 | -123.12 | abc |",
- "+------------+---------------------+---------+-----+------------+---------------------+-----------+---------+",
- ];
-
- let results = execute_to_batches(&ctx, sql).await;
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn hash_join_with_dictionary() -> Result<()> {
- let ctx = create_hashjoin_datatype_context()?;
-
- // inner join on hash supported data type (Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)))
- let sql = "select * from t1 join t2 on t1.c4 = t2.c4";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
- let expected = vec![
- "Explain [plan_type:Utf8, plan:Utf8]",
- " Inner Join: t1.c4 = t2.c4 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]",
- " TableScan: t1 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N]",
- " TableScan: t2 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- let expected = vec![
- "+------------+---------------------+------+-----+------------+---------------------+---------+-----+",
- "| c1 | c2 | c3 | c4 | c1 | c2 | c3 | c4 |",
- "+------------+---------------------+------+-----+------------+---------------------+---------+-----+",
- "| 1970-01-02 | 1970-01-02T00:00:00 | 1.23 | abc | 1970-01-02 | 1970-01-02T00:00:00 | -123.12 | abc |",
- "+------------+---------------------+------+-----+------------+---------------------+---------+-----+",
- ];
-
- let results = execute_to_batches(&ctx, sql).await;
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn sort_merge_join_on_date32() -> Result<()> {
- let ctx = create_sort_merge_join_datatype_context()?;
-
- // inner sort merge join on data type (Date32)
- let sql = "select * from t1 join t2 on t1.c1 = t2.c1";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let expected = vec![
- "SortMergeJoin: join_type=Inner, on=[(Column { name: \"c1\", index: 0 }, Column { name: \"c1\", index: 0 })]",
- " SortExec: expr=[c1@0 ASC]",
- " CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
- " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
- " MemoryExec: partitions=1, partition_sizes=[1]",
- " SortExec: expr=[c1@0 ASC]",
- " CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
- " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
- " MemoryExec: partitions=1, partition_sizes=[1]",
- ];
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- let expected = vec![
- "+------------+---------------------+---------+-----+------------+---------------------+---------+-----+",
- "| c1 | c2 | c3 | c4 | c1 | c2 | c3 | c4 |",
- "+------------+---------------------+---------+-----+------------+---------------------+---------+-----+",
- "| 1970-01-02 | 1970-01-02T00:00:00 | 1.23 | abc | 1970-01-02 | 1970-01-02T00:00:00 | -123.12 | abc |",
- "| 1970-01-04 | | -123.12 | jkl | 1970-01-04 | | 789.00 | |",
- "+------------+---------------------+---------+-----+------------+---------------------+---------+-----+",
- ];
-
- let results = execute_to_batches(&ctx, sql).await;
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn sort_merge_join_on_decimal() -> Result<()> {
- let ctx = create_sort_merge_join_datatype_context()?;
-
- // right join on data type (Decimal)
- let sql = "select * from t1 right join t2 on t1.c3 = t2.c3";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let expected = vec![
- "ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as c4, c1@5 as c1, c2@6 as c2, c3@7 as c3, c4@8 as c4]",
- " SortMergeJoin: join_type=Right, on=[(Column { name: \"CAST(t1.c3 AS Decimal128(10, 2))\", index: 4 }, Column { name: \"c3\", index: 2 })]",
- " SortExec: expr=[CAST(t1.c3 AS Decimal128(10, 2))@4 ASC]",
- " CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"CAST(t1.c3 AS Decimal128(10, 2))\", index: 4 }], 2), input_partitions=2",
- " ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as c4, CAST(c3@2 AS Decimal128(10, 2)) as CAST(t1.c3 AS Decimal128(10, 2))]",
- " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
- " MemoryExec: partitions=1, partition_sizes=[1]",
- " SortExec: expr=[c3@2 ASC]",
- " CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"c3\", index: 2 }], 2), input_partitions=2",
- " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
- " MemoryExec: partitions=1, partition_sizes=[1]",
- ];
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- let expected = vec![
- "+------------+---------------------+---------+-----+------------+---------------------+-----------+---------+",
- "| c1 | c2 | c3 | c4 | c1 | c2 | c3 | c4 |",
- "+------------+---------------------+---------+-----+------------+---------------------+-----------+---------+",
- "| | | | | | | 100000.00 | abcdefg |",
- "| | | | | | 1970-01-04T00:00:00 | 0.00 | qwerty |",
- "| | 1970-01-04T00:00:00 | 789.00 | ghi | 1970-01-04 | | 789.00 | |",
- "| 1970-01-04 | | -123.12 | jkl | 1970-01-02 | 1970-01-02T00:00:00 | -123.12 | abc |",
- "+------------+---------------------+---------+-----+------------+---------------------+-----------+---------+",
- ];
-
- let results = execute_to_batches(&ctx, sql).await;
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn left_semi_join() -> Result<()> {
- let test_repartition_joins = vec![true, false];
- for repartition_joins in test_repartition_joins {
- let ctx = create_left_semi_anti_join_context_with_null_ids(
- "t1_id",
- "t2_id",
- repartition_joins,
- )
- .unwrap();
-
- let sql = "SELECT t1_id, t1_name FROM t1 WHERE t1_id IN (SELECT t2_id FROM t2) ORDER BY t1_id";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let expected = if repartition_joins {
- vec![
- "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
- " SortExec: expr=[t1_id@0 ASC NULLS LAST]",
- " CoalesceBatchesExec: target_batch_size=4096",
- " HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]",
- " CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2), input_partitions=2",
- " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
- " MemoryExec: partitions=1, partition_sizes=[1]",
- " CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2), input_partitions=2",
- " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
- " MemoryExec: partitions=1, partition_sizes=[1]",
- ]
- } else {
- vec![
- "SortExec: expr=[t1_id@0 ASC NULLS LAST]",
- " CoalesceBatchesExec: target_batch_size=4096",
- " HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]",
- " MemoryExec: partitions=1, partition_sizes=[1]",
- " MemoryExec: partitions=1, partition_sizes=[1]",
- ]
- };
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- let actual = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+-------+---------+",
- "| t1_id | t1_name |",
- "+-------+---------+",
- "| 11 | a |",
- "| 11 | a |",
- "| 22 | b |",
- "| 44 | d |",
- "+-------+---------+",
- ];
- assert_batches_eq!(expected, &actual);
-
- let sql = "SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT 1 FROM t2 WHERE t1_id = t2_id) ORDER BY t1_id";
- let actual = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+-------+---------+",
- "| t1_id | t1_name |",
- "+-------+---------+",
- "| 11 | a |",
- "| 11 | a |",
- "| 22 | b |",
- "| 44 | d |",
- "+-------+---------+",
- ];
- assert_batches_eq!(expected, &actual);
-
- let sql = "SELECT t1_id FROM t1 INTERSECT SELECT t2_id FROM t2 ORDER BY t1_id";
- let actual = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+-------+",
- "| t1_id |",
- "+-------+",
- "| 11 |",
- "| 22 |",
- "| 44 |",
- "| |",
- "+-------+",
- ];
- assert_batches_eq!(expected, &actual);
-
- let sql = "SELECT t1_id, t1_name FROM t1 LEFT SEMI JOIN t2 ON (t1_id = t2_id) ORDER BY t1_id";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let expected = if repartition_joins {
- vec![
- "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
- " SortExec: expr=[t1_id@0 ASC NULLS LAST]",
- " CoalesceBatchesExec: target_batch_size=4096",
- " HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]",
- " CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2), input_partitions=2",
- " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
- " MemoryExec: partitions=1, partition_sizes=[1]",
- " CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2), input_partitions=2",
- " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
- " MemoryExec: partitions=1, partition_sizes=[1]",
- ]
- } else {
- vec![
- "SortExec: expr=[t1_id@0 ASC NULLS LAST]",
- " CoalesceBatchesExec: target_batch_size=4096",
- " HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]",
- " MemoryExec: partitions=1, partition_sizes=[1]",
- " MemoryExec: partitions=1, partition_sizes=[1]",
- ]
- };
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- let actual = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+-------+---------+",
- "| t1_id | t1_name |",
- "+-------+---------+",
- "| 11 | a |",
- "| 11 | a |",
- "| 22 | b |",
- "| 44 | d |",
- "+-------+---------+",
- ];
- assert_batches_eq!(expected, &actual);
- }
-
- Ok(())
-}
-
#[tokio::test]
#[ignore = "Test ignored, will be enabled after fixing the NAAJ bug"]
// https://github.com/apache/arrow-datafusion/issues/4211
@@ -595,108 +75,3 @@ async fn null_aware_left_anti_join() -> Result<()> {
Ok(())
}
-
-#[tokio::test]
-async fn right_semi_join() -> Result<()> {
- let test_repartition_joins = vec![true, false];
- for repartition_joins in test_repartition_joins {
- let ctx = create_right_semi_anti_join_context_with_null_ids(
- "t1_id",
- "t2_id",
- repartition_joins,
- )
- .unwrap();
-
- let sql = "SELECT t1_id, t1_name, t1_int FROM t1 WHERE EXISTS (SELECT * FROM t2 where t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let expected = if repartition_joins {
- vec![
- "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
- " SortExec: expr=[t1_id@0 ASC NULLS LAST]",
- " CoalesceBatchesExec: target_batch_size=4096",
- " HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=t2_name@1 != t1_name@0",
- " CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2), input_partitions=2",
- " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
- " MemoryExec: partitions=1, partition_sizes=[1]",
- " CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2), input_partitions=2",
- " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
- " MemoryExec: partitions=1, partition_sizes=[1]",
- ]
- } else {
- vec![
- "SortExec: expr=[t1_id@0 ASC NULLS LAST]",
- " CoalesceBatchesExec: target_batch_size=4096",
- " HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=t2_name@1 != t1_name@0",
- " MemoryExec: partitions=1, partition_sizes=[1]",
- " MemoryExec: partitions=1, partition_sizes=[1]",
- ]
- };
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- let actual = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+-------+---------+--------+",
- "| t1_id | t1_name | t1_int |",
- "+-------+---------+--------+",
- "| 11 | a | 1 |",
- "+-------+---------+--------+",
- ];
- assert_batches_eq!(expected, &actual);
-
- let sql = "SELECT t1_id, t1_name, t1_int FROM t2 RIGHT SEMI JOIN t1 on (t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let expected = if repartition_joins {
- vec![
- "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
- " SortExec: expr=[t1_id@0 ASC NULLS LAST]",
- " CoalesceBatchesExec: target_batch_size=4096",
- " HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=t2_name@0 != t1_name@1",
- " CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2), input_partitions=2",
- " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
- " MemoryExec: partitions=1, partition_sizes=[1]",
- " CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2), input_partitions=2",
- " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
- " MemoryExec: partitions=1, partition_sizes=[1]",
- ]
- } else {
- vec![
- "SortExec: expr=[t1_id@0 ASC NULLS LAST]",
- " CoalesceBatchesExec: target_batch_size=4096",
- " HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=t2_name@0 != t1_name@1",
- " MemoryExec: partitions=1, partition_sizes=[1]",
- " MemoryExec: partitions=1, partition_sizes=[1]",
- ]
- };
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- let actual = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+-------+---------+--------+",
- "| t1_id | t1_name | t1_int |",
- "+-------+---------+--------+",
- "| 11 | a | 1 |",
- "+-------+---------+--------+",
- ];
- assert_batches_eq!(expected, &actual);
- }
-
- Ok(())
-}
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index 42de0d23a3..db2e7cfcff 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -25,7 +25,6 @@ use arrow::{
use chrono::prelude::*;
use chrono::Duration;
-use datafusion::config::ConfigOptions;
use datafusion::datasource::TableProvider;
use datafusion::logical_expr::{Aggregate, LogicalPlan, TableScan};
use datafusion::physical_plan::metrics::MetricValue;
@@ -351,207 +350,6 @@ fn create_left_semi_anti_join_context_with_null_ids(
Ok(ctx)
}
-fn create_right_semi_anti_join_context_with_null_ids(
- column_left: &str,
- column_right: &str,
- repartition_joins: bool,
-) -> Result<SessionContext> {
- let ctx = SessionContext::with_config(
- SessionConfig::new()
- .with_repartition_joins(repartition_joins)
- .with_target_partitions(2)
- .with_batch_size(4096),
- );
-
- let t1_schema = Arc::new(Schema::new(vec![
- Field::new(column_left, DataType::UInt32, true),
- Field::new("t1_name", DataType::Utf8, true),
- Field::new("t1_int", DataType::UInt32, true),
- ]));
- let t1_data = RecordBatch::try_new(
- t1_schema,
- vec![
- Arc::new(UInt32Array::from(vec![
- Some(11),
- Some(22),
- Some(33),
- Some(44),
- None,
- ])),
- Arc::new(StringArray::from(vec![
- Some("a"),
- Some("b"),
- Some("c"),
- Some("d"),
- Some("e"),
- ])),
- Arc::new(UInt32Array::from(vec![1, 2, 3, 4, 0])),
- ],
- )?;
- ctx.register_batch("t1", t1_data)?;
-
- let t2_schema = Arc::new(Schema::new(vec![
- Field::new(column_right, DataType::UInt32, true),
- Field::new("t2_name", DataType::Utf8, true),
- ]));
- // t2 data size is smaller than t1
- let t2_data = RecordBatch::try_new(
- t2_schema,
- vec![
- Arc::new(UInt32Array::from(vec![Some(11), Some(11), None])),
- Arc::new(StringArray::from(vec![Some("a"), Some("x"), None])),
- ],
- )?;
- ctx.register_batch("t2", t2_data)?;
-
- Ok(ctx)
-}
-
-fn create_hashjoin_datatype_context() -> Result<SessionContext> {
- let ctx = SessionContext::new();
-
- let t1_schema = Schema::new(vec![
- Field::new("c1", DataType::Date32, true),
- Field::new("c2", DataType::Date64, true),
- Field::new("c3", DataType::Decimal128(5, 2), true),
- Field::new(
- "c4",
- DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
- true,
- ),
- ]);
- let dict1: DictionaryArray<Int32Type> =
- vec!["abc", "def", "ghi", "jkl"].into_iter().collect();
- let t1_data = RecordBatch::try_new(
- Arc::new(t1_schema),
- vec![
- Arc::new(Date32Array::from(vec![Some(1), Some(2), None, Some(3)])),
- Arc::new(Date64Array::from(vec![
- Some(86400000),
- Some(172800000),
- Some(259200000),
- None,
- ])),
- Arc::new(
- Decimal128Array::from_iter_values([123, 45600, 78900, -12312])
- .with_precision_and_scale(5, 2)
- .unwrap(),
- ),
- Arc::new(dict1),
- ],
- )?;
- ctx.register_batch("t1", t1_data)?;
-
- let t2_schema = Schema::new(vec![
- Field::new("c1", DataType::Date32, true),
- Field::new("c2", DataType::Date64, true),
- Field::new("c3", DataType::Decimal128(10, 2), true),
- Field::new(
- "c4",
- DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
- true,
- ),
- ]);
- let dict2: DictionaryArray<Int32Type> =
- vec!["abc", "abcdefg", "qwerty", ""].into_iter().collect();
- let t2_data = RecordBatch::try_new(
- Arc::new(t2_schema),
- vec![
- Arc::new(Date32Array::from(vec![Some(1), None, None, Some(3)])),
- Arc::new(Date64Array::from(vec![
- Some(86400000),
- None,
- Some(259200000),
- None,
- ])),
- Arc::new(
- Decimal128Array::from_iter_values([-12312, 10000000, 0, 78900])
- .with_precision_and_scale(10, 2)
- .unwrap(),
- ),
- Arc::new(dict2),
- ],
- )?;
- ctx.register_batch("t2", t2_data)?;
-
- Ok(ctx)
-}
-
-fn create_sort_merge_join_datatype_context() -> Result<SessionContext> {
- let mut config = ConfigOptions::new();
- config.optimizer.prefer_hash_join = false;
- config.execution.target_partitions = 2;
- config.execution.batch_size = 4096;
-
- let ctx = SessionContext::with_config(config.into());
-
- let t1_schema = Schema::new(vec![
- Field::new("c1", DataType::Date32, true),
- Field::new("c2", DataType::Date64, true),
- Field::new("c3", DataType::Decimal128(5, 2), true),
- Field::new(
- "c4",
- DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
- true,
- ),
- ]);
- let dict1: DictionaryArray<Int32Type> =
- vec!["abc", "def", "ghi", "jkl"].into_iter().collect();
- let t1_data = RecordBatch::try_new(
- Arc::new(t1_schema),
- vec![
- Arc::new(Date32Array::from(vec![Some(1), Some(2), None, Some(3)])),
- Arc::new(Date64Array::from(vec![
- Some(86400000),
- Some(172800000),
- Some(259200000),
- None,
- ])),
- Arc::new(
- Decimal128Array::from_iter_values([123, 45600, 78900, -12312])
- .with_precision_and_scale(5, 2)
- .unwrap(),
- ),
- Arc::new(dict1),
- ],
- )?;
- ctx.register_batch("t1", t1_data)?;
-
- let t2_schema = Schema::new(vec![
- Field::new("c1", DataType::Date32, true),
- Field::new("c2", DataType::Date64, true),
- Field::new("c3", DataType::Decimal128(10, 2), true),
- Field::new(
- "c4",
- DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
- true,
- ),
- ]);
- let dict2: DictionaryArray<Int32Type> =
- vec!["abc", "abcdefg", "qwerty", ""].into_iter().collect();
- let t2_data = RecordBatch::try_new(
- Arc::new(t2_schema),
- vec![
- Arc::new(Date32Array::from(vec![Some(1), None, None, Some(3)])),
- Arc::new(Date64Array::from(vec![
- Some(86400000),
- None,
- Some(259200000),
- None,
- ])),
- Arc::new(
- Decimal128Array::from_iter_values([-12312, 10000000, 0, 78900])
- .with_precision_and_scale(10, 2)
- .unwrap(),
- ),
- Arc::new(dict2),
- ],
- )?;
- ctx.register_batch("t2", t2_data)?;
-
- Ok(ctx)
-}
-
fn get_tpch_table_schema(table: &str) -> Schema {
match table {
"customer" => Schema::new(vec![
@@ -1106,93 +904,6 @@ fn normalize_vec_for_explain(v: Vec<Vec<String>>) -> Vec<Vec<String>> {
.collect::<Vec<_>>()
}
-/// Return a new table provider containing all of the supported timestamp types
-pub fn table_with_timestamps() -> Arc<dyn TableProvider> {
- let batch = make_timestamps();
- let schema = batch.schema();
- let partitions = vec![vec![batch]];
- Arc::new(MemTable::try_new(schema, partitions).unwrap())
-}
-
-/// Return record batch with all of the supported timestamp types
-/// values
-///
-/// Columns are named:
-/// "nanos" --> TimestampNanosecondArray
-/// "micros" --> TimestampMicrosecondArray
-/// "millis" --> TimestampMillisecondArray
-/// "secs" --> TimestampSecondArray
-/// "names" --> StringArray
-pub fn make_timestamps() -> RecordBatch {
- let ts_strings = vec![
- Some("2018-11-13T17:11:10.011375885995"),
- Some("2011-12-13T11:13:10.12345"),
- None,
- Some("2021-1-1T05:11:10.432"),
- ];
-
- let ts_nanos = ts_strings
- .into_iter()
- .map(|t| {
- t.map(|t| {
- t.parse::<chrono::NaiveDateTime>()
- .unwrap()
- .timestamp_nanos()
- })
- })
- .collect::<Vec<_>>();
-
- let ts_micros = ts_nanos
- .iter()
- .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000))
- .collect::<Vec<_>>();
-
- let ts_millis = ts_nanos
- .iter()
- .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000))
- .collect::<Vec<_>>();
-
- let ts_secs = ts_nanos
- .iter()
- .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000000))
- .collect::<Vec<_>>();
-
- let names = ts_nanos
- .iter()
- .enumerate()
- .map(|(i, _)| format!("Row {i}"))
- .collect::<Vec<_>>();
-
- let arr_nanos = TimestampNanosecondArray::from(ts_nanos);
- let arr_micros = TimestampMicrosecondArray::from(ts_micros);
- let arr_millis = TimestampMillisecondArray::from(ts_millis);
- let arr_secs = TimestampSecondArray::from(ts_secs);
-
- let names = names.iter().map(|s| s.as_str()).collect::<Vec<_>>();
- let arr_names = StringArray::from(names);
-
- let schema = Schema::new(vec![
- Field::new("nanos", arr_nanos.data_type().clone(), true),
- Field::new("micros", arr_micros.data_type().clone(), true),
- Field::new("millis", arr_millis.data_type().clone(), true),
- Field::new("secs", arr_secs.data_type().clone(), true),
- Field::new("name", arr_names.data_type().clone(), true),
- ]);
- let schema = Arc::new(schema);
-
- RecordBatch::try_new(
- schema,
- vec![
- Arc::new(arr_nanos),
- Arc::new(arr_micros),
- Arc::new(arr_millis),
- Arc::new(arr_secs),
- Arc::new(arr_names),
- ],
- )
- .unwrap()
-}
-
#[tokio::test]
async fn nyc() -> Result<()> {
// schema for nyxtaxi csv files
diff --git a/datafusion/core/tests/sqllogictests/src/main.rs b/datafusion/core/tests/sqllogictests/src/main.rs
index d93d59fb3e..6f17010a39 100644
--- a/datafusion/core/tests/sqllogictests/src/main.rs
+++ b/datafusion/core/tests/sqllogictests/src/main.rs
@@ -285,6 +285,17 @@ async fn context_for_test_file(relative_path: &Path) -> Option<TestContext> {
return None;
}
}
+ "joins.slt" => {
+ info!("Registering timestamps tables");
+ setup::register_timestamps_table(test_ctx.session_ctx()).await;
+ setup::register_hashjoin_datatype_table(test_ctx.session_ctx()).await;
+ setup::register_left_semi_anti_join_table(test_ctx.session_ctx()).await;
+ setup::register_right_semi_anti_join_table(test_ctx.session_ctx()).await;
+
+ let mut test_ctx = test_ctx;
+ setup::register_partition_table(&mut test_ctx).await;
+ return Some(test_ctx);
+ }
_ => {
info!("Using default SessionContext");
}
diff --git a/datafusion/core/tests/sqllogictests/src/setup.rs b/datafusion/core/tests/sqllogictests/src/setup.rs
index 8072a0f74f..96090d676a 100644
--- a/datafusion/core/tests/sqllogictests/src/setup.rs
+++ b/datafusion/core/tests/sqllogictests/src/setup.rs
@@ -15,6 +15,12 @@
// specific language governing permissions and limitations
// under the License.
+use arrow_array::types::Int32Type;
+use arrow_array::{
+ Array, Date32Array, Date64Array, Decimal128Array, DictionaryArray, StringArray,
+ TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
+ TimestampSecondArray,
+};
use datafusion::{
arrow::{
array::{
@@ -28,9 +34,11 @@ use datafusion::{
prelude::{CsvReadOptions, SessionContext},
test_util,
};
+use std::fs::File;
+use std::io::Write;
use std::sync::Arc;
-use crate::utils;
+use crate::{utils, TestContext};
#[cfg(feature = "avro")]
pub async fn register_avro_tables(ctx: &mut crate::TestContext) {
@@ -212,3 +220,311 @@ fn register_nan_table(ctx: &SessionContext) {
.unwrap();
ctx.register_batch("test_float", data).unwrap();
}
+
+pub async fn register_timestamps_table(ctx: &SessionContext) {
+ let batch = make_timestamps();
+ let schema = batch.schema();
+ let partitions = vec![vec![batch]];
+
+ ctx.register_table(
+ "test_timestamps_table",
+ Arc::new(MemTable::try_new(schema, partitions).unwrap()),
+ )
+ .unwrap();
+}
+
+/// Return record batch with all of the supported timestamp types
+/// values
+///
+/// Columns are named:
+/// "nanos" --> TimestampNanosecondArray
+/// "micros" --> TimestampMicrosecondArray
+/// "millis" --> TimestampMillisecondArray
+/// "secs" --> TimestampSecondArray
+/// "names" --> StringArray
+pub fn make_timestamps() -> RecordBatch {
+ let ts_strings = vec![
+ Some("2018-11-13T17:11:10.011375885995"),
+ Some("2011-12-13T11:13:10.12345"),
+ None,
+ Some("2021-1-1T05:11:10.432"),
+ ];
+
+ let ts_nanos = ts_strings
+ .into_iter()
+ .map(|t| {
+ t.map(|t| {
+ t.parse::<chrono::NaiveDateTime>()
+ .unwrap()
+ .timestamp_nanos()
+ })
+ })
+ .collect::<Vec<_>>();
+
+ let ts_micros = ts_nanos
+ .iter()
+ .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000))
+ .collect::<Vec<_>>();
+
+ let ts_millis = ts_nanos
+ .iter()
+ .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000))
+ .collect::<Vec<_>>();
+
+ let ts_secs = ts_nanos
+ .iter()
+ .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000000))
+ .collect::<Vec<_>>();
+
+ let names = ts_nanos
+ .iter()
+ .enumerate()
+ .map(|(i, _)| format!("Row {i}"))
+ .collect::<Vec<_>>();
+
+ let arr_nanos = TimestampNanosecondArray::from(ts_nanos);
+ let arr_micros = TimestampMicrosecondArray::from(ts_micros);
+ let arr_millis = TimestampMillisecondArray::from(ts_millis);
+ let arr_secs = TimestampSecondArray::from(ts_secs);
+
+ let names = names.iter().map(|s| s.as_str()).collect::<Vec<_>>();
+ let arr_names = StringArray::from(names);
+
+ let schema = Schema::new(vec![
+ Field::new("nanos", arr_nanos.data_type().clone(), true),
+ Field::new("micros", arr_micros.data_type().clone(), true),
+ Field::new("millis", arr_millis.data_type().clone(), true),
+ Field::new("secs", arr_secs.data_type().clone(), true),
+ Field::new("name", arr_names.data_type().clone(), true),
+ ]);
+ let schema = Arc::new(schema);
+
+ RecordBatch::try_new(
+ schema,
+ vec![
+ Arc::new(arr_nanos),
+ Arc::new(arr_micros),
+ Arc::new(arr_millis),
+ Arc::new(arr_secs),
+ Arc::new(arr_names),
+ ],
+ )
+ .unwrap()
+}
+
+/// Generate a partitioned CSV file and register it with an execution context
+pub async fn register_partition_table(test_ctx: &mut TestContext) {
+ test_ctx.enable_testdir();
+ let partition_count = 1;
+ let file_extension = "csv";
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::UInt32, false),
+ Field::new("c2", DataType::UInt64, false),
+ Field::new("c3", DataType::Boolean, false),
+ ]));
+ // generate a partitioned file
+ for partition in 0..partition_count {
+ let filename = format!("partition-{partition}.{file_extension}");
+ let file_path = test_ctx.testdir_path().join(filename);
+ let mut file = File::create(file_path).unwrap();
+
+ // generate some data
+ for i in 0..=10 {
+ let data = format!("{},{},{}\n", partition, i, i % 2 == 0);
+ file.write_all(data.as_bytes()).unwrap()
+ }
+ }
+
+ // register csv file with the execution context
+ test_ctx
+ .ctx
+ .register_csv(
+ "test_partition_table",
+ test_ctx.testdir_path().to_str().unwrap(),
+ CsvReadOptions::new().schema(&schema),
+ )
+ .await
+ .unwrap();
+}
+
+pub async fn register_hashjoin_datatype_table(ctx: &SessionContext) {
+ let t1_schema = Schema::new(vec![
+ Field::new("c1", DataType::Date32, true),
+ Field::new("c2", DataType::Date64, true),
+ Field::new("c3", DataType::Decimal128(5, 2), true),
+ Field::new(
+ "c4",
+ DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
+ true,
+ ),
+ ]);
+ let dict1: DictionaryArray<Int32Type> =
+ vec!["abc", "def", "ghi", "jkl"].into_iter().collect();
+ let t1_data = RecordBatch::try_new(
+ Arc::new(t1_schema),
+ vec![
+ Arc::new(Date32Array::from(vec![Some(1), Some(2), None, Some(3)])),
+ Arc::new(Date64Array::from(vec![
+ Some(86400000),
+ Some(172800000),
+ Some(259200000),
+ None,
+ ])),
+ Arc::new(
+ Decimal128Array::from_iter_values([123, 45600, 78900, -12312])
+ .with_precision_and_scale(5, 2)
+ .unwrap(),
+ ),
+ Arc::new(dict1),
+ ],
+ )
+ .unwrap();
+ ctx.register_batch("hashjoin_datatype_table_t1", t1_data)
+ .unwrap();
+
+ let t2_schema = Schema::new(vec![
+ Field::new("c1", DataType::Date32, true),
+ Field::new("c2", DataType::Date64, true),
+ Field::new("c3", DataType::Decimal128(10, 2), true),
+ Field::new(
+ "c4",
+ DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
+ true,
+ ),
+ ]);
+ let dict2: DictionaryArray<Int32Type> = vec!["abc", "abcdefg", "qwerty", "qwe"]
+ .into_iter()
+ .collect();
+ let t2_data = RecordBatch::try_new(
+ Arc::new(t2_schema),
+ vec![
+ Arc::new(Date32Array::from(vec![Some(1), None, None, Some(3)])),
+ Arc::new(Date64Array::from(vec![
+ Some(86400000),
+ None,
+ Some(259200000),
+ None,
+ ])),
+ Arc::new(
+ Decimal128Array::from_iter_values([-12312, 10000000, 0, 78900])
+ .with_precision_and_scale(10, 2)
+ .unwrap(),
+ ),
+ Arc::new(dict2),
+ ],
+ )
+ .unwrap();
+ ctx.register_batch("hashjoin_datatype_table_t2", t2_data)
+ .unwrap();
+}
+
+pub async fn register_left_semi_anti_join_table(ctx: &SessionContext) {
+ let t1_schema = Arc::new(Schema::new(vec![
+ Field::new("t1_id", DataType::UInt32, true),
+ Field::new("t1_name", DataType::Utf8, true),
+ Field::new("t1_int", DataType::UInt32, true),
+ ]));
+ let t1_data = RecordBatch::try_new(
+ t1_schema,
+ vec![
+ Arc::new(UInt32Array::from(vec![
+ Some(11),
+ Some(11),
+ Some(22),
+ Some(33),
+ Some(44),
+ None,
+ ])),
+ Arc::new(StringArray::from(vec![
+ Some("a"),
+ Some("a"),
+ Some("b"),
+ Some("c"),
+ Some("d"),
+ Some("e"),
+ ])),
+ Arc::new(UInt32Array::from(vec![1, 1, 2, 3, 4, 0])),
+ ],
+ )
+ .unwrap();
+ ctx.register_batch("left_semi_anti_join_table_t1", t1_data)
+ .unwrap();
+
+ let t2_schema = Arc::new(Schema::new(vec![
+ Field::new("t2_id", DataType::UInt32, true),
+ Field::new("t2_name", DataType::Utf8, true),
+ Field::new("t2_int", DataType::UInt32, true),
+ ]));
+ let t2_data = RecordBatch::try_new(
+ t2_schema,
+ vec![
+ Arc::new(UInt32Array::from(vec![
+ Some(11),
+ Some(11),
+ Some(22),
+ Some(44),
+ Some(55),
+ None,
+ ])),
+ Arc::new(StringArray::from(vec![
+ Some("z"),
+ Some("z"),
+ Some("y"),
+ Some("x"),
+ Some("w"),
+ Some("v"),
+ ])),
+ Arc::new(UInt32Array::from(vec![3, 3, 1, 3, 3, 0])),
+ ],
+ )
+ .unwrap();
+ ctx.register_batch("left_semi_anti_join_table_t2", t2_data)
+ .unwrap();
+}
+
+pub async fn register_right_semi_anti_join_table(ctx: &SessionContext) {
+ let t1_schema = Arc::new(Schema::new(vec![
+ Field::new("t1_id", DataType::UInt32, true),
+ Field::new("t1_name", DataType::Utf8, true),
+ Field::new("t1_int", DataType::UInt32, true),
+ ]));
+ let t1_data = RecordBatch::try_new(
+ t1_schema,
+ vec![
+ Arc::new(UInt32Array::from(vec![
+ Some(11),
+ Some(22),
+ Some(33),
+ Some(44),
+ None,
+ ])),
+ Arc::new(StringArray::from(vec![
+ Some("a"),
+ Some("b"),
+ Some("c"),
+ Some("d"),
+ Some("e"),
+ ])),
+ Arc::new(UInt32Array::from(vec![1, 2, 3, 4, 0])),
+ ],
+ )
+ .unwrap();
+ ctx.register_batch("right_semi_anti_join_table_t1", t1_data)
+ .unwrap();
+
+ let t2_schema = Arc::new(Schema::new(vec![
+ Field::new("t2_id", DataType::UInt32, true),
+ Field::new("t2_name", DataType::Utf8, true),
+ ]));
+ // t2 data size is smaller than t1
+ let t2_data = RecordBatch::try_new(
+ t2_schema,
+ vec![
+ Arc::new(UInt32Array::from(vec![Some(11), Some(11), None])),
+ Arc::new(StringArray::from(vec![Some("a"), Some("x"), None])),
+ ],
+ )
+ .unwrap();
+ ctx.register_batch("right_semi_anti_join_table_t2", t2_data)
+ .unwrap();
+}
diff --git a/datafusion/core/tests/sqllogictests/test_files/joins.slt b/datafusion/core/tests/sqllogictests/test_files/joins.slt
index 8e8527fc3d..4486d7c47b 100644
--- a/datafusion/core/tests/sqllogictests/test_files/joins.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/joins.slt
@@ -2337,3 +2337,603 @@ WHERE NOT EXISTS(
statement ok
set datafusion.explain.logical_plan_only = false;
+
+
+# test timestamp join on nanos datatype
+query PPPPTPPPPT rowsort
+SELECT * FROM test_timestamps_table as t1 JOIN (SELECT * FROM test_timestamps_table ) as t2 ON t1.nanos = t2.nanos;
+----
+2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123 2011-12-13T11:13:10 Row 1 2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123 2011-12-13T11:13:10 Row 1
+2018-11-13T17:11:10.011375885 2018-11-13T17:11:10.011375 2018-11-13T17:11:10.011 2018-11-13T17:11:10 Row 0 2018-11-13T17:11:10.011375885 2018-11-13T17:11:10.011375 2018-11-13T17:11:10.011 2018-11-13T17:11:10 Row 0
+2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10 Row 3 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10 Row 3
+
+# test timestamp join on micros datatype
+query PPPPTPPPPT rowsort
+SELECT * FROM test_timestamps_table as t1 JOIN (SELECT * FROM test_timestamps_table ) as t2 ON t1.micros = t2.micros
+----
+2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123 2011-12-13T11:13:10 Row 1 2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123 2011-12-13T11:13:10 Row 1
+2018-11-13T17:11:10.011375885 2018-11-13T17:11:10.011375 2018-11-13T17:11:10.011 2018-11-13T17:11:10 Row 0 2018-11-13T17:11:10.011375885 2018-11-13T17:11:10.011375 2018-11-13T17:11:10.011 2018-11-13T17:11:10 Row 0
+2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10 Row 3 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10 Row 3
+
+# test timestamp join on millis datatype
+query PPPPTPPPPT rowsort
+SELECT * FROM test_timestamps_table as t1 JOIN (SELECT * FROM test_timestamps_table ) as t2 ON t1.millis = t2.millis
+----
+2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123 2011-12-13T11:13:10 Row 1 2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123 2011-12-13T11:13:10 Row 1
+2018-11-13T17:11:10.011375885 2018-11-13T17:11:10.011375 2018-11-13T17:11:10.011 2018-11-13T17:11:10 Row 0 2018-11-13T17:11:10.011375885 2018-11-13T17:11:10.011375 2018-11-13T17:11:10.011 2018-11-13T17:11:10 Row 0
+2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10 Row 3 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10 Row 3
+
+# left_join_using_2
+query II
+SELECT t1.c1, t2.c2 FROM test_partition_table t1 JOIN test_partition_table t2 USING (c2) ORDER BY t2.c2;
+----
+0 1
+0 2
+0 3
+0 4
+0 5
+0 6
+0 7
+0 8
+0 9
+0 10
+
+# left_join_using_join_key_projection
+query III
+SELECT t1.c1, t1.c2, t2.c2 FROM test_partition_table t1 JOIN test_partition_table t2 USING (c2) ORDER BY t2.c2
+----
+0 1 1
+0 2 2
+0 3 3
+0 4 4
+0 5 5
+0 6 6
+0 7 7
+0 8 8
+0 9 9
+0 10 10
+
+# left_join_2
+query III
+SELECT t1.c1, t1.c2, t2.c2 FROM test_partition_table t1 JOIN test_partition_table t2 ON t1.c2 = t2.c2 ORDER BY t1.c2
+----
+0 1 1
+0 2 2
+0 3 3
+0 4 4
+0 5 5
+0 6 6
+0 7 7
+0 8 8
+0 9 9
+0 10 10
+
+####
+# Config setup
+####
+
+statement ok
+set datafusion.explain.logical_plan_only = true
+
+# explain hash_join_with_date32
+query TT
+explain select * from hashjoin_datatype_table_t1 t1 join hashjoin_datatype_table_t2 t2 on t1.c1 = t2.c1
+----
+logical_plan
+Inner Join: t1.c1 = t2.c1
+--SubqueryAlias: t1
+----TableScan: hashjoin_datatype_table_t1 projection=[c1, c2, c3, c4]
+--SubqueryAlias: t2
+----TableScan: hashjoin_datatype_table_t2 projection=[c1, c2, c3, c4]
+
+# hash_join_with_date32
+query DDR?DDR? rowsort
+select * from hashjoin_datatype_table_t1 t1 join hashjoin_datatype_table_t2 t2 on t1.c1 = t2.c1
+----
+1970-01-02 1970-01-02T00:00:00 1.23 abc 1970-01-02 1970-01-02T00:00:00 -123.12 abc
+1970-01-04 NULL -123.12 jkl 1970-01-04 NULL 789 qwe
+
+
+# explain hash_join_with_date64
+query TT
+explain select * from hashjoin_datatype_table_t1 t1 left join hashjoin_datatype_table_t2 t2 on t1.c2 = t2.c2
+----
+logical_plan
+Left Join: t1.c2 = t2.c2
+--SubqueryAlias: t1
+----TableScan: hashjoin_datatype_table_t1 projection=[c1, c2, c3, c4]
+--SubqueryAlias: t2
+----TableScan: hashjoin_datatype_table_t2 projection=[c1, c2, c3, c4]
+
+# hash_join_with_date64
+query DDR?DDR? rowsort
+select * from hashjoin_datatype_table_t1 t1 left join hashjoin_datatype_table_t2 t2 on t1.c2 = t2.c2
+----
+1970-01-02 1970-01-02T00:00:00 1.23 abc 1970-01-02 1970-01-02T00:00:00 -123.12 abc
+1970-01-03 1970-01-03T00:00:00 456 def NULL NULL NULL NULL
+1970-01-04 NULL -123.12 jkl NULL NULL NULL NULL
+NULL 1970-01-04T00:00:00 789 ghi NULL 1970-01-04T00:00:00 0 qwerty
+
+
+# explain hash_join_with_decimal
+query TT
+explain select * from hashjoin_datatype_table_t1 t1 right join hashjoin_datatype_table_t1 t2 on t1.c3 = t2.c3
+----
+logical_plan
+Right Join: t1.c3 = t2.c3
+--SubqueryAlias: t1
+----TableScan: hashjoin_datatype_table_t1 projection=[c1, c2, c3, c4]
+--SubqueryAlias: t2
+----TableScan: hashjoin_datatype_table_t1 projection=[c1, c2, c3, c4]
+
+# hash_join_with_decimal
+query DDR?DDR? rowsort
+select * from hashjoin_datatype_table_t1 t1 right join hashjoin_datatype_table_t1 t2 on t1.c3 = t2.c3
+----
+1970-01-02 1970-01-02T00:00:00 1.23 abc 1970-01-02 1970-01-02T00:00:00 1.23 abc
+1970-01-03 1970-01-03T00:00:00 456 def 1970-01-03 1970-01-03T00:00:00 456 def
+1970-01-04 NULL -123.12 jkl 1970-01-04 NULL -123.12 jkl
+NULL 1970-01-04T00:00:00 789 ghi NULL 1970-01-04T00:00:00 789 ghi
+
+# explain hash_join_with_dictionary
+query TT
+explain select * from hashjoin_datatype_table_t1 t1 join hashjoin_datatype_table_t1 t2 on t1.c4 = t2.c4
+----
+logical_plan
+Inner Join: t1.c4 = t2.c4
+--SubqueryAlias: t1
+----TableScan: hashjoin_datatype_table_t1 projection=[c1, c2, c3, c4]
+--SubqueryAlias: t2
+----TableScan: hashjoin_datatype_table_t1 projection=[c1, c2, c3, c4]
+
+# hash_join_with_dictionary
+query DDR?DDR? rowsort
+select * from hashjoin_datatype_table_t1 t1 join hashjoin_datatype_table_t2 t2 on t1.c4 = t2.c4
+----
+1970-01-02 1970-01-02T00:00:00 1.23 abc 1970-01-02 1970-01-02T00:00:00 -123.12 abc
+
+####
+# Config teardown
+####
+statement ok
+set datafusion.explain.logical_plan_only = false
+
+
+####
+# Config setup
+####
+statement ok
+set datafusion.explain.logical_plan_only = false;
+
+statement ok
+set datafusion.optimizer.prefer_hash_join = false;
+
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+statement ok
+set datafusion.execution.batch_size = 4096;
+
+# explain sort_merge_join_on_date32 inner sort merge join on data type (Date32)
+query TT
+explain select * from hashjoin_datatype_table_t1 t1 join hashjoin_datatype_table_t2 t2 on t1.c1 = t2.c1
+----
+logical_plan
+Inner Join: t1.c1 = t2.c1
+--SubqueryAlias: t1
+----TableScan: hashjoin_datatype_table_t1 projection=[c1, c2, c3, c4]
+--SubqueryAlias: t2
+----TableScan: hashjoin_datatype_table_t2 projection=[c1, c2, c3, c4]
+physical_plan
+SortMergeJoin: join_type=Inner, on=[(Column { name: "c1", index: 0 }, Column { name: "c1", index: 0 })]
+--SortExec: expr=[c1@0 ASC]
+----CoalesceBatchesExec: target_batch_size=4096
+------RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 2), input_partitions=2
+--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----------MemoryExec: partitions=1, partition_sizes=[1]
+--SortExec: expr=[c1@0 ASC]
+----CoalesceBatchesExec: target_batch_size=4096
+------RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 2), input_partitions=2
+--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----------MemoryExec: partitions=1, partition_sizes=[1]
+
+# sort_merge_join_on_date32 inner sort merge join on data type (Date32)
+query DDR?DDR? rowsort
+select * from hashjoin_datatype_table_t1 t1 join hashjoin_datatype_table_t2 t2 on t1.c1 = t2.c1
+----
+1970-01-02 1970-01-02T00:00:00 1.23 abc 1970-01-02 1970-01-02T00:00:00 -123.12 abc
+1970-01-04 NULL -123.12 jkl 1970-01-04 NULL 789 qwe
+
+# explain sort_merge_join_on_decimal right join on data type (Decimal)
+query TT
+explain select * from hashjoin_datatype_table_t1 t1 right join hashjoin_datatype_table_t2 t2 on t1.c3 = t2.c3
+----
+logical_plan
+Right Join: CAST(t1.c3 AS Decimal128(10, 2)) = t2.c3
+--SubqueryAlias: t1
+----TableScan: hashjoin_datatype_table_t1 projection=[c1, c2, c3, c4]
+--SubqueryAlias: t2
+----TableScan: hashjoin_datatype_table_t2 projection=[c1, c2, c3, c4]
+physical_plan
+ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as c4, c1@5 as c1, c2@6 as c2, c3@7 as c3, c4@8 as c4]
+--SortMergeJoin: join_type=Right, on=[(Column { name: "CAST(t1.c3 AS Decimal128(10, 2))", index: 4 }, Column { name: "c3", index: 2 })]
+----SortExec: expr=[CAST(t1.c3 AS Decimal128(10, 2))@4 ASC]
+------CoalesceBatchesExec: target_batch_size=4096
+--------RepartitionExec: partitioning=Hash([Column { name: "CAST(t1.c3 AS Decimal128(10, 2))", index: 4 }], 2), input_partitions=2
+----------ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as c4, CAST(c3@2 AS Decimal128(10, 2)) as CAST(t1.c3 AS Decimal128(10, 2))]
+------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+--------------MemoryExec: partitions=1, partition_sizes=[1]
+----SortExec: expr=[c3@2 ASC]
+------CoalesceBatchesExec: target_batch_size=4096
+--------RepartitionExec: partitioning=Hash([Column { name: "c3", index: 2 }], 2), input_partitions=2
+----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+------------MemoryExec: partitions=1, partition_sizes=[1]
+
+# sort_merge_join_on_decimal right join on data type (Decimal)
+query DDR?DDR? rowsort
+select * from hashjoin_datatype_table_t1 t1 right join hashjoin_datatype_table_t2 t2 on t1.c3 = t2.c3
+----
+1970-01-04 NULL -123.12 jkl 1970-01-02 1970-01-02T00:00:00 -123.12 abc
+NULL 1970-01-04T00:00:00 789 ghi 1970-01-04 NULL 789 qwe
+NULL NULL NULL NULL NULL 1970-01-04T00:00:00 0 qwerty
+NULL NULL NULL NULL NULL NULL 100000 abcdefg
+
+####
+# Config teardown
+####
+statement ok
+set datafusion.explain.logical_plan_only = true;
+
+statement ok
+set datafusion.optimizer.prefer_hash_join = true;
+
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+statement ok
+set datafusion.execution.batch_size = 4096;
+
+
+
+#Test the left_semi_join scenarios where the current repartition_joins parameter is set to true .
+####
+# Config setup
+####
+statement ok
+set datafusion.explain.logical_plan_only = false;
+
+statement ok
+set datafusion.explain.physical_plan_only = true;
+
+statement ok
+set datafusion.optimizer.repartition_joins = true;
+
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+statement ok
+set datafusion.execution.batch_size = 4096;
+
+query TT
+explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id IN (SELECT t2_id FROM left_semi_anti_join_table_t2 t2) ORDER BY t1_id
+----
+physical_plan
+SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
+--SortExec: expr=[t1_id@0 ASC NULLS LAST]
+----CoalesceBatchesExec: target_batch_size=4096
+------HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(Column { name: "t1_id", index: 0 }, Column { name: "t2_id", index: 0 })]
+--------CoalesceBatchesExec: target_batch_size=4096
+----------RepartitionExec: partitioning=Hash([Column { name: "t1_id", index: 0 }], 2), input_partitions=2
+------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+--------------MemoryExec: partitions=1, partition_sizes=[1]
+--------CoalesceBatchesExec: target_batch_size=4096
+----------RepartitionExec: partitioning=Hash([Column { name: "t2_id", index: 0 }], 2), input_partitions=2
+------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+--------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query IT rowsort
+SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id IN (SELECT t2_id FROM left_semi_anti_join_table_t2 t2) ORDER BY t1_id
+----
+11 a
+11 a
+22 b
+44 d
+
+query IT rowsort
+SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE EXISTS (SELECT 1 FROM left_semi_anti_join_table_t2 t2 WHERE t1_id = t2_id) ORDER BY t1_id
+----
+11 a
+11 a
+22 b
+44 d
+
+query I rowsort
+SELECT t1_id FROM left_semi_anti_join_table_t1 t1 INTERSECT SELECT t2_id FROM left_semi_anti_join_table_t2 t2 ORDER BY t1_id
+----
+11
+22
+44
+NULL
+
+query TT
+explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 LEFT SEMI JOIN left_semi_anti_join_table_t2 t2 ON (t1_id = t2_id) ORDER BY t1_id
+----
+physical_plan
+SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
+--SortExec: expr=[t1_id@0 ASC NULLS LAST]
+----CoalesceBatchesExec: target_batch_size=4096
+------HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(Column { name: "t1_id", index: 0 }, Column { name: "t2_id", index: 0 })]
+--------CoalesceBatchesExec: target_batch_size=4096
+----------RepartitionExec: partitioning=Hash([Column { name: "t1_id", index: 0 }], 2), input_partitions=2
+------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+--------------MemoryExec: partitions=1, partition_sizes=[1]
+--------CoalesceBatchesExec: target_batch_size=4096
+----------RepartitionExec: partitioning=Hash([Column { name: "t2_id", index: 0 }], 2), input_partitions=2
+------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+--------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query IT
+SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 LEFT SEMI JOIN left_semi_anti_join_table_t2 t2 ON (t1_id = t2_id) ORDER BY t1_id
+----
+11 a
+11 a
+22 b
+44 d
+
+####
+# Config teardown
+####
+statement ok
+set datafusion.explain.logical_plan_only = true;
+
+statement ok
+set datafusion.explain.physical_plan_only = false;
+
+statement ok
+set datafusion.optimizer.repartition_joins = true;
+
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+statement ok
+set datafusion.execution.batch_size = 4096;
+
+#Test the left_semi_join scenarios where the current repartition_joins parameter is set to false .
+####
+# Config setup
+####
+statement ok
+set datafusion.explain.logical_plan_only = false;
+
+statement ok
+set datafusion.explain.physical_plan_only = true;
+
+statement ok
+set datafusion.optimizer.repartition_joins = false;
+
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+statement ok
+set datafusion.execution.batch_size = 4096;
+
+query TT
+explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id IN (SELECT t2_id FROM left_semi_anti_join_table_t2 t2) ORDER BY t1_id
+----
+physical_plan
+SortExec: expr=[t1_id@0 ASC NULLS LAST]
+--CoalesceBatchesExec: target_batch_size=4096
+----HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(Column { name: "t1_id", index: 0 }, Column { name: "t2_id", index: 0 })]
+------MemoryExec: partitions=1, partition_sizes=[1]
+------MemoryExec: partitions=1, partition_sizes=[1]
+
+query IT rowsort
+SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id IN (SELECT t2_id FROM left_semi_anti_join_table_t2 t2) ORDER BY t1_id
+----
+11 a
+11 a
+22 b
+44 d
+
+query IT rowsort
+SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE EXISTS (SELECT 1 FROM left_semi_anti_join_table_t2 t2 WHERE t1_id = t2_id) ORDER BY t1_id
+----
+11 a
+11 a
+22 b
+44 d
+
+query I rowsort
+SELECT t1_id FROM left_semi_anti_join_table_t1 t1 INTERSECT SELECT t2_id FROM left_semi_anti_join_table_t2 t2 ORDER BY t1_id
+----
+11
+22
+44
+NULL
+
+query TT
+explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 LEFT SEMI JOIN left_semi_anti_join_table_t2 t2 ON (t1_id = t2_id) ORDER BY t1_id
+----
+physical_plan
+SortExec: expr=[t1_id@0 ASC NULLS LAST]
+--CoalesceBatchesExec: target_batch_size=4096
+----HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(Column { name: "t1_id", index: 0 }, Column { name: "t2_id", index: 0 })]
+------MemoryExec: partitions=1, partition_sizes=[1]
+------MemoryExec: partitions=1, partition_sizes=[1]
+
+query IT
+SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 LEFT SEMI JOIN left_semi_anti_join_table_t2 t2 ON (t1_id = t2_id) ORDER BY t1_id
+----
+11 a
+11 a
+22 b
+44 d
+
+####
+# Config teardown
+####
+statement ok
+set datafusion.explain.logical_plan_only = true;
+
+statement ok
+set datafusion.explain.physical_plan_only = false;
+
+statement ok
+set datafusion.optimizer.repartition_joins = true;
+
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+statement ok
+set datafusion.execution.batch_size = 4096;
+
+
+#Test the right_semi_join scenarios where the current repartition_joins parameter is set to true .
+####
+# Config setup
+####
+statement ok
+set datafusion.explain.logical_plan_only = false;
+
+statement ok
+set datafusion.explain.physical_plan_only = true;
+
+statement ok
+set datafusion.optimizer.repartition_joins = true;
+
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+statement ok
+set datafusion.execution.batch_size = 4096;
+
+query TT
+explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1 WHERE EXISTS (SELECT * FROM right_semi_anti_join_table_t2 t2 where t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id
+----
+physical_plan
+SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
+--SortExec: expr=[t1_id@0 ASC NULLS LAST]
+----CoalesceBatchesExec: target_batch_size=4096
+------HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(Column { name: "t2_id", index: 0 }, Column { name: "t1_id", index: 0 })], filter=t2_name@1 != t1_name@0
+--------CoalesceBatchesExec: target_batch_size=4096
+----------RepartitionExec: partitioning=Hash([Column { name: "t2_id", index: 0 }], 2), input_partitions=2
+------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+--------------MemoryExec: partitions=1, partition_sizes=[1]
+--------CoalesceBatchesExec: target_batch_size=4096
+----------RepartitionExec: partitioning=Hash([Column { name: "t1_id", index: 0 }], 2), input_partitions=2
+------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+--------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query ITI rowsort
+SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1 WHERE EXISTS (SELECT * FROM right_semi_anti_join_table_t2 t2 where t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id
+----
+11 a 1
+
+query TT
+explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t2 t2 RIGHT SEMI JOIN right_semi_anti_join_table_t1 t1 on (t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id
+----
+physical_plan
+SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
+--SortExec: expr=[t1_id@0 ASC NULLS LAST]
+----CoalesceBatchesExec: target_batch_size=4096
+------HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(Column { name: "t2_id", index: 0 }, Column { name: "t1_id", index: 0 })], filter=t2_name@0 != t1_name@1
+--------CoalesceBatchesExec: target_batch_size=4096
+----------RepartitionExec: partitioning=Hash([Column { name: "t2_id", index: 0 }], 2), input_partitions=2
+------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+--------------MemoryExec: partitions=1, partition_sizes=[1]
+--------CoalesceBatchesExec: target_batch_size=4096
+----------RepartitionExec: partitioning=Hash([Column { name: "t1_id", index: 0 }], 2), input_partitions=2
+------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+--------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query ITI rowsort
+SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t2 t2 RIGHT SEMI JOIN right_semi_anti_join_table_t1 t1 on (t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id
+----
+11 a 1
+
+####
+# Config teardown
+####
+statement ok
+set datafusion.explain.logical_plan_only = true;
+
+statement ok
+set datafusion.explain.physical_plan_only = false;
+
+statement ok
+set datafusion.optimizer.repartition_joins = true;
+
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+statement ok
+set datafusion.execution.batch_size = 4096;
+
+
+#Test the right_semi_join scenarios where the current repartition_joins parameter is set to false .
+####
+# Config setup
+####
+statement ok
+set datafusion.explain.logical_plan_only = false;
+
+statement ok
+set datafusion.explain.physical_plan_only = true;
+
+statement ok
+set datafusion.optimizer.repartition_joins = false;
+
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+statement ok
+set datafusion.execution.batch_size = 4096;
+
+query TT
+explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1 WHERE EXISTS (SELECT * FROM right_semi_anti_join_table_t2 t2 where t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id
+----
+physical_plan
+SortExec: expr=[t1_id@0 ASC NULLS LAST]
+--CoalesceBatchesExec: target_batch_size=4096
+----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(Column { name: "t2_id", index: 0 }, Column { name: "t1_id", index: 0 })], filter=t2_name@1 != t1_name@0
+------MemoryExec: partitions=1, partition_sizes=[1]
+------MemoryExec: partitions=1, partition_sizes=[1]
+
+query ITI rowsort
+SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1 WHERE EXISTS (SELECT * FROM right_semi_anti_join_table_t2 t2 where t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id
+----
+11 a 1
+
+query TT
+explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t2 t2 RIGHT SEMI JOIN right_semi_anti_join_table_t1 t1 on (t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id
+----
+physical_plan
+SortExec: expr=[t1_id@0 ASC NULLS LAST]
+--CoalesceBatchesExec: target_batch_size=4096
+----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(Column { name: "t2_id", index: 0 }, Column { name: "t1_id", index: 0 })], filter=t2_name@0 != t1_name@1
+------MemoryExec: partitions=1, partition_sizes=[1]
+------MemoryExec: partitions=1, partition_sizes=[1]
+
+query ITI rowsort
+SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t2 t2 RIGHT SEMI JOIN right_semi_anti_join_table_t1 t1 on (t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id
+----
+11 a 1
+
+####
+# Config teardown
+####
+statement ok
+set datafusion.explain.logical_plan_only = true;
+
+statement ok
+set datafusion.explain.physical_plan_only = false;
+
+statement ok
+set datafusion.optimizer.repartition_joins = true;
+
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+statement ok
+set datafusion.execution.batch_size = 4096;
+