You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/06/13 00:55:17 UTC

[arrow-datafusion] branch main updated: Port tests in joins.rs to sqllogictes (#6642)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 3fc009b41e  Port tests in joins.rs to sqllogictes (#6642)
3fc009b41e is described below

commit 3fc009b41e392d50b67cea9dddcc6e7014b3b78b
Author: zhenxing jiang <ji...@gmail.com>
AuthorDate: Tue Jun 13 08:55:10 2023 +0800

     Port tests in joins.rs to sqllogictes (#6642)
    
    * port left_join_using_2,timestamp_join to sqllogictest
    
    * move remiand joins testcase to sqllogicatest
    
    * clippy fix
---
 datafusion/core/tests/sql/joins.rs                 | 625 ---------------------
 datafusion/core/tests/sql/mod.rs                   | 289 ----------
 datafusion/core/tests/sqllogictests/src/main.rs    |  11 +
 datafusion/core/tests/sqllogictests/src/setup.rs   | 318 ++++++++++-
 .../core/tests/sqllogictests/test_files/joins.slt  | 600 ++++++++++++++++++++
 5 files changed, 928 insertions(+), 915 deletions(-)

diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs
index 054db2c3f7..116a3c1a79 100644
--- a/datafusion/core/tests/sql/joins.rs
+++ b/datafusion/core/tests/sql/joins.rs
@@ -37,144 +37,6 @@ async fn nestedjoin_with_alias() -> Result<()> {
     Ok(())
 }
 
-#[tokio::test]
-async fn join_timestamp() -> Result<()> {
-    let ctx = SessionContext::new();
-    ctx.register_table("t", table_with_timestamps()).unwrap();
-
-    let expected = vec![
-        "+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+",
-        "| nanos                         | micros                     | millis                  | secs                | name  | nanos                         | micros                     | millis                  | secs                | name  |",
-        "+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+",
-        "| 2011-12-13T11:13:10.123450    | 2011-12-13T11:13:10.123450 | 2011-12-13T11:13:10.123 | 2011-12-13T11:13:10 | Row 1 | 2011-12-13T11:13:10.123450    | 2011-12-13T11:13:10.123450 | 2011-12-13T11:13:10.123 | 2011-12-13T11:13:10 | Row 1 |",
-        "| 2018-11-13T17:11:10.011375885 | 2018-11-13T17:11:10.011375 | 2018-11-13T17:11:10.011 | 2018-11-13T17:11:10 | Row 0 | 2018-11-13T17:11:10.011375885 | 2018-11-13T17:11:10.011375 | 2018-11-13T17:11:10.011 | 2018-11-13T17:11:10 | Row 0 |",
-        "| 2021-01-01T05:11:10.432       | 2021-01-01T05:11:10.432    | 2021-01-01T05:11:10.432 | 2021-01-01T05:11:10 | Row 3 | 2021-01-01T05:11:10.432       | 2021-01-01T05:11:10.432    | 2021-01-01T05:11:10.432 | 2021-01-01T05:11:10 | Row 3 |",
-        "+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+",
-    ];
-
-    let results = execute_to_batches(
-        &ctx,
-        "SELECT * FROM t as t1  \
-         JOIN (SELECT * FROM t) as t2 \
-         ON t1.nanos = t2.nanos",
-    )
-    .await;
-
-    assert_batches_sorted_eq!(expected, &results);
-
-    let results = execute_to_batches(
-        &ctx,
-        "SELECT * FROM t as t1  \
-         JOIN (SELECT * FROM t) as t2 \
-         ON t1.micros = t2.micros",
-    )
-    .await;
-
-    assert_batches_sorted_eq!(expected, &results);
-
-    let results = execute_to_batches(
-        &ctx,
-        "SELECT * FROM t as t1  \
-         JOIN (SELECT * FROM t) as t2 \
-         ON t1.millis = t2.millis",
-    )
-    .await;
-
-    assert_batches_sorted_eq!(expected, &results);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn left_join_using_2() -> Result<()> {
-    let results = execute_with_partition(
-        "SELECT t1.c1, t2.c2 FROM test t1 JOIN test t2 USING (c2) ORDER BY t2.c2",
-        1,
-    )
-    .await?;
-    assert_eq!(results.len(), 1);
-
-    let expected = vec![
-        "+----+----+",
-        "| c1 | c2 |",
-        "+----+----+",
-        "| 0  | 1  |",
-        "| 0  | 2  |",
-        "| 0  | 3  |",
-        "| 0  | 4  |",
-        "| 0  | 5  |",
-        "| 0  | 6  |",
-        "| 0  | 7  |",
-        "| 0  | 8  |",
-        "| 0  | 9  |",
-        "| 0  | 10 |",
-        "+----+----+",
-    ];
-
-    assert_batches_eq!(expected, &results);
-    Ok(())
-}
-
-#[tokio::test]
-async fn left_join_using_join_key_projection() -> Result<()> {
-    let results = execute_with_partition(
-        "SELECT t1.c1, t1.c2, t2.c2 FROM test t1 JOIN test t2 USING (c2) ORDER BY t2.c2",
-        1,
-    )
-    .await?;
-    assert_eq!(results.len(), 1);
-
-    let expected = vec![
-        "+----+----+----+",
-        "| c1 | c2 | c2 |",
-        "+----+----+----+",
-        "| 0  | 1  | 1  |",
-        "| 0  | 2  | 2  |",
-        "| 0  | 3  | 3  |",
-        "| 0  | 4  | 4  |",
-        "| 0  | 5  | 5  |",
-        "| 0  | 6  | 6  |",
-        "| 0  | 7  | 7  |",
-        "| 0  | 8  | 8  |",
-        "| 0  | 9  | 9  |",
-        "| 0  | 10 | 10 |",
-        "+----+----+----+",
-    ];
-
-    assert_batches_eq!(expected, &results);
-    Ok(())
-}
-
-#[tokio::test]
-async fn left_join_2() -> Result<()> {
-    let results = execute_with_partition(
-        "SELECT t1.c1, t1.c2, t2.c2 FROM test t1 JOIN test t2 ON t1.c2 = t2.c2 ORDER BY t1.c2",
-        1,
-    )
-        .await?;
-    assert_eq!(results.len(), 1);
-
-    let expected = vec![
-        "+----+----+----+",
-        "| c1 | c2 | c2 |",
-        "+----+----+----+",
-        "| 0  | 1  | 1  |",
-        "| 0  | 2  | 2  |",
-        "| 0  | 3  | 3  |",
-        "| 0  | 4  | 4  |",
-        "| 0  | 5  | 5  |",
-        "| 0  | 6  | 6  |",
-        "| 0  | 7  | 7  |",
-        "| 0  | 8  | 8  |",
-        "| 0  | 9  | 9  |",
-        "| 0  | 10 | 10 |",
-        "+----+----+----+",
-    ];
-
-    assert_batches_eq!(expected, &results);
-    Ok(())
-}
-
 #[tokio::test]
 async fn join_partitioned() -> Result<()> {
     // self join on partition id (workaround for duplicate column name)
@@ -192,388 +54,6 @@ async fn join_partitioned() -> Result<()> {
     Ok(())
 }
 
-#[tokio::test]
-async fn hash_join_with_date32() -> Result<()> {
-    let ctx = create_hashjoin_datatype_context()?;
-
-    // inner join on hash supported data type (Date32)
-    let sql = "select * from t1 join t2 on t1.c1 = t2.c1";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-    let expected = vec![
-        "Explain [plan_type:Utf8, plan:Utf8]",
-        "  Inner Join: t1.c1 = t2.c1 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]",
-        "    TableScan: t1 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N]",
-        "    TableScan: t2 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let expected = vec![
-        "+------------+---------------------+---------+-----+------------+---------------------+---------+-----+",
-        "| c1         | c2                  | c3      | c4  | c1         | c2                  | c3      | c4  |",
-        "+------------+---------------------+---------+-----+------------+---------------------+---------+-----+",
-        "| 1970-01-02 | 1970-01-02T00:00:00 | 1.23    | abc | 1970-01-02 | 1970-01-02T00:00:00 | -123.12 | abc |",
-        "| 1970-01-04 |                     | -123.12 | jkl | 1970-01-04 |                     | 789.00  |     |",
-        "+------------+---------------------+---------+-----+------------+---------------------+---------+-----+",
-    ];
-
-    let results = execute_to_batches(&ctx, sql).await;
-    assert_batches_sorted_eq!(expected, &results);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn hash_join_with_date64() -> Result<()> {
-    let ctx = create_hashjoin_datatype_context()?;
-
-    // left join on hash supported data type (Date64)
-    let sql = "select * from t1 left join t2 on t1.c2 = t2.c2";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-    let expected = vec![
-        "Explain [plan_type:Utf8, plan:Utf8]",
-        "  Left Join: t1.c2 = t2.c2 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]",
-        "    TableScan: t1 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N]",
-        "    TableScan: t2 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let expected = vec![
-        "+------------+---------------------+---------+-----+------------+---------------------+---------+--------+",
-        "| c1         | c2                  | c3      | c4  | c1         | c2                  | c3      | c4     |",
-        "+------------+---------------------+---------+-----+------------+---------------------+---------+--------+",
-        "|            | 1970-01-04T00:00:00 | 789.00  | ghi |            | 1970-01-04T00:00:00 | 0.00    | qwerty |",
-        "| 1970-01-02 | 1970-01-02T00:00:00 | 1.23    | abc | 1970-01-02 | 1970-01-02T00:00:00 | -123.12 | abc    |",
-        "| 1970-01-03 | 1970-01-03T00:00:00 | 456.00  | def |            |                     |         |        |",
-        "| 1970-01-04 |                     | -123.12 | jkl |            |                     |         |        |",
-        "+------------+---------------------+---------+-----+------------+---------------------+---------+--------+",
-    ];
-
-    let results = execute_to_batches(&ctx, sql).await;
-    assert_batches_sorted_eq!(expected, &results);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn hash_join_with_decimal() -> Result<()> {
-    let ctx = create_hashjoin_datatype_context()?;
-
-    // right join on hash supported data type (Decimal)
-    let sql = "select * from t1 right join t2 on t1.c3 = t2.c3";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-    let expected = vec![
-        "Explain [plan_type:Utf8, plan:Utf8]",
-        "  Right Join: CAST(t1.c3 AS Decimal128(10, 2)) = t2.c3 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]",
-        "    TableScan: t1 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N]",
-        "    TableScan: t2 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let expected = vec![
-        "+------------+---------------------+---------+-----+------------+---------------------+-----------+---------+",
-        "| c1         | c2                  | c3      | c4  | c1         | c2                  | c3        | c4      |",
-        "+------------+---------------------+---------+-----+------------+---------------------+-----------+---------+",
-        "|            |                     |         |     |            |                     | 100000.00 | abcdefg |",
-        "|            |                     |         |     |            | 1970-01-04T00:00:00 | 0.00      | qwerty  |",
-        "|            | 1970-01-04T00:00:00 | 789.00  | ghi | 1970-01-04 |                     | 789.00    |         |",
-        "| 1970-01-04 |                     | -123.12 | jkl | 1970-01-02 | 1970-01-02T00:00:00 | -123.12   | abc     |",
-        "+------------+---------------------+---------+-----+------------+---------------------+-----------+---------+",
-    ];
-
-    let results = execute_to_batches(&ctx, sql).await;
-    assert_batches_sorted_eq!(expected, &results);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn hash_join_with_dictionary() -> Result<()> {
-    let ctx = create_hashjoin_datatype_context()?;
-
-    // inner join on hash supported data type (Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)))
-    let sql = "select * from t1 join t2 on t1.c4 = t2.c4";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-    let expected = vec![
-        "Explain [plan_type:Utf8, plan:Utf8]",
-        "  Inner Join: t1.c4 = t2.c4 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]",
-        "    TableScan: t1 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N]",
-        "    TableScan: t2 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let expected = vec![
-        "+------------+---------------------+------+-----+------------+---------------------+---------+-----+",
-        "| c1         | c2                  | c3   | c4  | c1         | c2                  | c3      | c4  |",
-        "+------------+---------------------+------+-----+------------+---------------------+---------+-----+",
-        "| 1970-01-02 | 1970-01-02T00:00:00 | 1.23 | abc | 1970-01-02 | 1970-01-02T00:00:00 | -123.12 | abc |",
-        "+------------+---------------------+------+-----+------------+---------------------+---------+-----+",
-    ];
-
-    let results = execute_to_batches(&ctx, sql).await;
-    assert_batches_sorted_eq!(expected, &results);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn sort_merge_join_on_date32() -> Result<()> {
-    let ctx = create_sort_merge_join_datatype_context()?;
-
-    // inner sort merge join on data type (Date32)
-    let sql = "select * from t1 join t2 on t1.c1 = t2.c1";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let expected = vec![
-        "SortMergeJoin: join_type=Inner, on=[(Column { name: \"c1\", index: 0 }, Column { name: \"c1\", index: 0 })]",
-        "  SortExec: expr=[c1@0 ASC]",
-        "    CoalesceBatchesExec: target_batch_size=4096",
-        "      RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
-        "        RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
-        "          MemoryExec: partitions=1, partition_sizes=[1]",
-        "  SortExec: expr=[c1@0 ASC]",
-        "    CoalesceBatchesExec: target_batch_size=4096",
-        "      RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
-        "        RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
-        "          MemoryExec: partitions=1, partition_sizes=[1]",
-    ];
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let expected = vec![
-        "+------------+---------------------+---------+-----+------------+---------------------+---------+-----+",
-        "| c1         | c2                  | c3      | c4  | c1         | c2                  | c3      | c4  |",
-        "+------------+---------------------+---------+-----+------------+---------------------+---------+-----+",
-        "| 1970-01-02 | 1970-01-02T00:00:00 | 1.23    | abc | 1970-01-02 | 1970-01-02T00:00:00 | -123.12 | abc |",
-        "| 1970-01-04 |                     | -123.12 | jkl | 1970-01-04 |                     | 789.00  |     |",
-        "+------------+---------------------+---------+-----+------------+---------------------+---------+-----+",
-    ];
-
-    let results = execute_to_batches(&ctx, sql).await;
-    assert_batches_sorted_eq!(expected, &results);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn sort_merge_join_on_decimal() -> Result<()> {
-    let ctx = create_sort_merge_join_datatype_context()?;
-
-    // right join on data type (Decimal)
-    let sql = "select * from t1 right join t2 on t1.c3 = t2.c3";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let expected = vec![
-        "ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as c4, c1@5 as c1, c2@6 as c2, c3@7 as c3, c4@8 as c4]",
-        "  SortMergeJoin: join_type=Right, on=[(Column { name: \"CAST(t1.c3 AS Decimal128(10, 2))\", index: 4 }, Column { name: \"c3\", index: 2 })]",
-        "    SortExec: expr=[CAST(t1.c3 AS Decimal128(10, 2))@4 ASC]",
-        "      CoalesceBatchesExec: target_batch_size=4096",
-        "        RepartitionExec: partitioning=Hash([Column { name: \"CAST(t1.c3 AS Decimal128(10, 2))\", index: 4 }], 2), input_partitions=2",
-        "          ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as c4, CAST(c3@2 AS Decimal128(10, 2)) as CAST(t1.c3 AS Decimal128(10, 2))]",
-        "            RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
-        "              MemoryExec: partitions=1, partition_sizes=[1]",
-        "    SortExec: expr=[c3@2 ASC]",
-        "      CoalesceBatchesExec: target_batch_size=4096",
-        "        RepartitionExec: partitioning=Hash([Column { name: \"c3\", index: 2 }], 2), input_partitions=2",
-        "          RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
-        "            MemoryExec: partitions=1, partition_sizes=[1]",
-    ];
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let expected = vec![
-        "+------------+---------------------+---------+-----+------------+---------------------+-----------+---------+",
-        "| c1         | c2                  | c3      | c4  | c1         | c2                  | c3        | c4      |",
-        "+------------+---------------------+---------+-----+------------+---------------------+-----------+---------+",
-        "|            |                     |         |     |            |                     | 100000.00 | abcdefg |",
-        "|            |                     |         |     |            | 1970-01-04T00:00:00 | 0.00      | qwerty  |",
-        "|            | 1970-01-04T00:00:00 | 789.00  | ghi | 1970-01-04 |                     | 789.00    |         |",
-        "| 1970-01-04 |                     | -123.12 | jkl | 1970-01-02 | 1970-01-02T00:00:00 | -123.12   | abc     |",
-        "+------------+---------------------+---------+-----+------------+---------------------+-----------+---------+",
-    ];
-
-    let results = execute_to_batches(&ctx, sql).await;
-    assert_batches_sorted_eq!(expected, &results);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn left_semi_join() -> Result<()> {
-    let test_repartition_joins = vec![true, false];
-    for repartition_joins in test_repartition_joins {
-        let ctx = create_left_semi_anti_join_context_with_null_ids(
-            "t1_id",
-            "t2_id",
-            repartition_joins,
-        )
-        .unwrap();
-
-        let sql = "SELECT t1_id, t1_name FROM t1 WHERE t1_id IN (SELECT t2_id FROM t2) ORDER BY t1_id";
-        let msg = format!("Creating logical plan for '{sql}'");
-        let dataframe = ctx.sql(sql).await.expect(&msg);
-        let physical_plan = dataframe.create_physical_plan().await?;
-        let expected = if repartition_joins {
-            vec![
-                "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
-                "  SortExec: expr=[t1_id@0 ASC NULLS LAST]",
-                "    CoalesceBatchesExec: target_batch_size=4096",
-                "      HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]",
-                "        CoalesceBatchesExec: target_batch_size=4096",
-                "          RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2), input_partitions=2",
-                "            RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
-                "              MemoryExec: partitions=1, partition_sizes=[1]",
-                "        CoalesceBatchesExec: target_batch_size=4096",
-                "          RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2), input_partitions=2",
-                "            RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
-                "              MemoryExec: partitions=1, partition_sizes=[1]",
-            ]
-        } else {
-            vec![
-                "SortExec: expr=[t1_id@0 ASC NULLS LAST]",
-                "  CoalesceBatchesExec: target_batch_size=4096",
-                "    HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]",
-                "      MemoryExec: partitions=1, partition_sizes=[1]",
-                "      MemoryExec: partitions=1, partition_sizes=[1]",
-            ]
-        };
-        let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-        let actual: Vec<&str> = formatted.trim().lines().collect();
-        assert_eq!(
-            expected, actual,
-            "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-        );
-
-        let actual = execute_to_batches(&ctx, sql).await;
-        let expected = vec![
-            "+-------+---------+",
-            "| t1_id | t1_name |",
-            "+-------+---------+",
-            "| 11    | a       |",
-            "| 11    | a       |",
-            "| 22    | b       |",
-            "| 44    | d       |",
-            "+-------+---------+",
-        ];
-        assert_batches_eq!(expected, &actual);
-
-        let sql = "SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT 1 FROM t2 WHERE t1_id = t2_id) ORDER BY t1_id";
-        let actual = execute_to_batches(&ctx, sql).await;
-        let expected = vec![
-            "+-------+---------+",
-            "| t1_id | t1_name |",
-            "+-------+---------+",
-            "| 11    | a       |",
-            "| 11    | a       |",
-            "| 22    | b       |",
-            "| 44    | d       |",
-            "+-------+---------+",
-        ];
-        assert_batches_eq!(expected, &actual);
-
-        let sql = "SELECT t1_id FROM t1 INTERSECT SELECT t2_id FROM t2 ORDER BY t1_id";
-        let actual = execute_to_batches(&ctx, sql).await;
-        let expected = vec![
-            "+-------+",
-            "| t1_id |",
-            "+-------+",
-            "| 11    |",
-            "| 22    |",
-            "| 44    |",
-            "|       |",
-            "+-------+",
-        ];
-        assert_batches_eq!(expected, &actual);
-
-        let sql = "SELECT t1_id, t1_name FROM t1 LEFT SEMI JOIN t2 ON (t1_id = t2_id) ORDER BY t1_id";
-        let msg = format!("Creating logical plan for '{sql}'");
-        let dataframe = ctx.sql(sql).await.expect(&msg);
-        let physical_plan = dataframe.create_physical_plan().await?;
-        let expected = if repartition_joins {
-            vec![
-                "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
-                "  SortExec: expr=[t1_id@0 ASC NULLS LAST]",
-                "    CoalesceBatchesExec: target_batch_size=4096",
-                "      HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]",
-                "        CoalesceBatchesExec: target_batch_size=4096",
-                "          RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2), input_partitions=2",
-                "            RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
-                "              MemoryExec: partitions=1, partition_sizes=[1]",
-                "        CoalesceBatchesExec: target_batch_size=4096",
-                "          RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2), input_partitions=2",
-                "            RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
-                "              MemoryExec: partitions=1, partition_sizes=[1]",
-            ]
-        } else {
-            vec![
-                "SortExec: expr=[t1_id@0 ASC NULLS LAST]",
-                "  CoalesceBatchesExec: target_batch_size=4096",
-                "    HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]",
-                "      MemoryExec: partitions=1, partition_sizes=[1]",
-                "      MemoryExec: partitions=1, partition_sizes=[1]",
-            ]
-        };
-        let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-        let actual: Vec<&str> = formatted.trim().lines().collect();
-        assert_eq!(
-            expected, actual,
-            "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-        );
-
-        let actual = execute_to_batches(&ctx, sql).await;
-        let expected = vec![
-            "+-------+---------+",
-            "| t1_id | t1_name |",
-            "+-------+---------+",
-            "| 11    | a       |",
-            "| 11    | a       |",
-            "| 22    | b       |",
-            "| 44    | d       |",
-            "+-------+---------+",
-        ];
-        assert_batches_eq!(expected, &actual);
-    }
-
-    Ok(())
-}
-
 #[tokio::test]
 #[ignore = "Test ignored, will be enabled after fixing the NAAJ bug"]
 // https://github.com/apache/arrow-datafusion/issues/4211
@@ -595,108 +75,3 @@ async fn null_aware_left_anti_join() -> Result<()> {
 
     Ok(())
 }
-
-#[tokio::test]
-async fn right_semi_join() -> Result<()> {
-    let test_repartition_joins = vec![true, false];
-    for repartition_joins in test_repartition_joins {
-        let ctx = create_right_semi_anti_join_context_with_null_ids(
-            "t1_id",
-            "t2_id",
-            repartition_joins,
-        )
-        .unwrap();
-
-        let sql = "SELECT t1_id, t1_name, t1_int FROM t1 WHERE EXISTS (SELECT * FROM t2 where t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id";
-        let msg = format!("Creating logical plan for '{sql}'");
-        let dataframe = ctx.sql(sql).await.expect(&msg);
-        let physical_plan = dataframe.create_physical_plan().await?;
-        let expected = if repartition_joins {
-            vec![
-                "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
-                 "  SortExec: expr=[t1_id@0 ASC NULLS LAST]",
-                 "    CoalesceBatchesExec: target_batch_size=4096",
-                 "      HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=t2_name@1 != t1_name@0",
-                 "        CoalesceBatchesExec: target_batch_size=4096",
-                 "          RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2), input_partitions=2",
-                 "            RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
-                 "              MemoryExec: partitions=1, partition_sizes=[1]",
-                 "        CoalesceBatchesExec: target_batch_size=4096",
-                 "          RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2), input_partitions=2",
-                 "            RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
-                 "              MemoryExec: partitions=1, partition_sizes=[1]",
-            ]
-        } else {
-            vec![
-                "SortExec: expr=[t1_id@0 ASC NULLS LAST]",
-                "  CoalesceBatchesExec: target_batch_size=4096",
-                "    HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=t2_name@1 != t1_name@0",
-                "      MemoryExec: partitions=1, partition_sizes=[1]",
-                "      MemoryExec: partitions=1, partition_sizes=[1]",
-            ]
-        };
-        let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-        let actual: Vec<&str> = formatted.trim().lines().collect();
-        assert_eq!(
-            expected, actual,
-            "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-        );
-
-        let actual = execute_to_batches(&ctx, sql).await;
-        let expected = vec![
-            "+-------+---------+--------+",
-            "| t1_id | t1_name | t1_int |",
-            "+-------+---------+--------+",
-            "| 11    | a       | 1      |",
-            "+-------+---------+--------+",
-        ];
-        assert_batches_eq!(expected, &actual);
-
-        let sql = "SELECT t1_id, t1_name, t1_int FROM t2 RIGHT SEMI JOIN t1 on (t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id";
-        let msg = format!("Creating logical plan for '{sql}'");
-        let dataframe = ctx.sql(sql).await.expect(&msg);
-        let physical_plan = dataframe.create_physical_plan().await?;
-        let expected = if repartition_joins {
-            vec![
-                "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
-                "  SortExec: expr=[t1_id@0 ASC NULLS LAST]",
-                "    CoalesceBatchesExec: target_batch_size=4096",
-                "      HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=t2_name@0 != t1_name@1",
-                "        CoalesceBatchesExec: target_batch_size=4096",
-                "          RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2), input_partitions=2",
-                "            RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
-                "              MemoryExec: partitions=1, partition_sizes=[1]",
-                "        CoalesceBatchesExec: target_batch_size=4096",
-                "          RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2), input_partitions=2",
-                "            RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
-                "              MemoryExec: partitions=1, partition_sizes=[1]",
-            ]
-        } else {
-            vec![
-                "SortExec: expr=[t1_id@0 ASC NULLS LAST]",
-                "  CoalesceBatchesExec: target_batch_size=4096",
-                "    HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=t2_name@0 != t1_name@1",
-                "      MemoryExec: partitions=1, partition_sizes=[1]",
-                "      MemoryExec: partitions=1, partition_sizes=[1]",
-            ]
-        };
-        let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-        let actual: Vec<&str> = formatted.trim().lines().collect();
-        assert_eq!(
-            expected, actual,
-            "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-        );
-
-        let actual = execute_to_batches(&ctx, sql).await;
-        let expected = vec![
-            "+-------+---------+--------+",
-            "| t1_id | t1_name | t1_int |",
-            "+-------+---------+--------+",
-            "| 11    | a       | 1      |",
-            "+-------+---------+--------+",
-        ];
-        assert_batches_eq!(expected, &actual);
-    }
-
-    Ok(())
-}
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index 42de0d23a3..db2e7cfcff 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -25,7 +25,6 @@ use arrow::{
 use chrono::prelude::*;
 use chrono::Duration;
 
-use datafusion::config::ConfigOptions;
 use datafusion::datasource::TableProvider;
 use datafusion::logical_expr::{Aggregate, LogicalPlan, TableScan};
 use datafusion::physical_plan::metrics::MetricValue;
@@ -351,207 +350,6 @@ fn create_left_semi_anti_join_context_with_null_ids(
     Ok(ctx)
 }
 
-fn create_right_semi_anti_join_context_with_null_ids(
-    column_left: &str,
-    column_right: &str,
-    repartition_joins: bool,
-) -> Result<SessionContext> {
-    let ctx = SessionContext::with_config(
-        SessionConfig::new()
-            .with_repartition_joins(repartition_joins)
-            .with_target_partitions(2)
-            .with_batch_size(4096),
-    );
-
-    let t1_schema = Arc::new(Schema::new(vec![
-        Field::new(column_left, DataType::UInt32, true),
-        Field::new("t1_name", DataType::Utf8, true),
-        Field::new("t1_int", DataType::UInt32, true),
-    ]));
-    let t1_data = RecordBatch::try_new(
-        t1_schema,
-        vec![
-            Arc::new(UInt32Array::from(vec![
-                Some(11),
-                Some(22),
-                Some(33),
-                Some(44),
-                None,
-            ])),
-            Arc::new(StringArray::from(vec![
-                Some("a"),
-                Some("b"),
-                Some("c"),
-                Some("d"),
-                Some("e"),
-            ])),
-            Arc::new(UInt32Array::from(vec![1, 2, 3, 4, 0])),
-        ],
-    )?;
-    ctx.register_batch("t1", t1_data)?;
-
-    let t2_schema = Arc::new(Schema::new(vec![
-        Field::new(column_right, DataType::UInt32, true),
-        Field::new("t2_name", DataType::Utf8, true),
-    ]));
-    // t2 data size is smaller than t1
-    let t2_data = RecordBatch::try_new(
-        t2_schema,
-        vec![
-            Arc::new(UInt32Array::from(vec![Some(11), Some(11), None])),
-            Arc::new(StringArray::from(vec![Some("a"), Some("x"), None])),
-        ],
-    )?;
-    ctx.register_batch("t2", t2_data)?;
-
-    Ok(ctx)
-}
-
-fn create_hashjoin_datatype_context() -> Result<SessionContext> {
-    let ctx = SessionContext::new();
-
-    let t1_schema = Schema::new(vec![
-        Field::new("c1", DataType::Date32, true),
-        Field::new("c2", DataType::Date64, true),
-        Field::new("c3", DataType::Decimal128(5, 2), true),
-        Field::new(
-            "c4",
-            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
-            true,
-        ),
-    ]);
-    let dict1: DictionaryArray<Int32Type> =
-        vec!["abc", "def", "ghi", "jkl"].into_iter().collect();
-    let t1_data = RecordBatch::try_new(
-        Arc::new(t1_schema),
-        vec![
-            Arc::new(Date32Array::from(vec![Some(1), Some(2), None, Some(3)])),
-            Arc::new(Date64Array::from(vec![
-                Some(86400000),
-                Some(172800000),
-                Some(259200000),
-                None,
-            ])),
-            Arc::new(
-                Decimal128Array::from_iter_values([123, 45600, 78900, -12312])
-                    .with_precision_and_scale(5, 2)
-                    .unwrap(),
-            ),
-            Arc::new(dict1),
-        ],
-    )?;
-    ctx.register_batch("t1", t1_data)?;
-
-    let t2_schema = Schema::new(vec![
-        Field::new("c1", DataType::Date32, true),
-        Field::new("c2", DataType::Date64, true),
-        Field::new("c3", DataType::Decimal128(10, 2), true),
-        Field::new(
-            "c4",
-            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
-            true,
-        ),
-    ]);
-    let dict2: DictionaryArray<Int32Type> =
-        vec!["abc", "abcdefg", "qwerty", ""].into_iter().collect();
-    let t2_data = RecordBatch::try_new(
-        Arc::new(t2_schema),
-        vec![
-            Arc::new(Date32Array::from(vec![Some(1), None, None, Some(3)])),
-            Arc::new(Date64Array::from(vec![
-                Some(86400000),
-                None,
-                Some(259200000),
-                None,
-            ])),
-            Arc::new(
-                Decimal128Array::from_iter_values([-12312, 10000000, 0, 78900])
-                    .with_precision_and_scale(10, 2)
-                    .unwrap(),
-            ),
-            Arc::new(dict2),
-        ],
-    )?;
-    ctx.register_batch("t2", t2_data)?;
-
-    Ok(ctx)
-}
-
-fn create_sort_merge_join_datatype_context() -> Result<SessionContext> {
-    let mut config = ConfigOptions::new();
-    config.optimizer.prefer_hash_join = false;
-    config.execution.target_partitions = 2;
-    config.execution.batch_size = 4096;
-
-    let ctx = SessionContext::with_config(config.into());
-
-    let t1_schema = Schema::new(vec![
-        Field::new("c1", DataType::Date32, true),
-        Field::new("c2", DataType::Date64, true),
-        Field::new("c3", DataType::Decimal128(5, 2), true),
-        Field::new(
-            "c4",
-            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
-            true,
-        ),
-    ]);
-    let dict1: DictionaryArray<Int32Type> =
-        vec!["abc", "def", "ghi", "jkl"].into_iter().collect();
-    let t1_data = RecordBatch::try_new(
-        Arc::new(t1_schema),
-        vec![
-            Arc::new(Date32Array::from(vec![Some(1), Some(2), None, Some(3)])),
-            Arc::new(Date64Array::from(vec![
-                Some(86400000),
-                Some(172800000),
-                Some(259200000),
-                None,
-            ])),
-            Arc::new(
-                Decimal128Array::from_iter_values([123, 45600, 78900, -12312])
-                    .with_precision_and_scale(5, 2)
-                    .unwrap(),
-            ),
-            Arc::new(dict1),
-        ],
-    )?;
-    ctx.register_batch("t1", t1_data)?;
-
-    let t2_schema = Schema::new(vec![
-        Field::new("c1", DataType::Date32, true),
-        Field::new("c2", DataType::Date64, true),
-        Field::new("c3", DataType::Decimal128(10, 2), true),
-        Field::new(
-            "c4",
-            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
-            true,
-        ),
-    ]);
-    let dict2: DictionaryArray<Int32Type> =
-        vec!["abc", "abcdefg", "qwerty", ""].into_iter().collect();
-    let t2_data = RecordBatch::try_new(
-        Arc::new(t2_schema),
-        vec![
-            Arc::new(Date32Array::from(vec![Some(1), None, None, Some(3)])),
-            Arc::new(Date64Array::from(vec![
-                Some(86400000),
-                None,
-                Some(259200000),
-                None,
-            ])),
-            Arc::new(
-                Decimal128Array::from_iter_values([-12312, 10000000, 0, 78900])
-                    .with_precision_and_scale(10, 2)
-                    .unwrap(),
-            ),
-            Arc::new(dict2),
-        ],
-    )?;
-    ctx.register_batch("t2", t2_data)?;
-
-    Ok(ctx)
-}
-
 fn get_tpch_table_schema(table: &str) -> Schema {
     match table {
         "customer" => Schema::new(vec![
@@ -1106,93 +904,6 @@ fn normalize_vec_for_explain(v: Vec<Vec<String>>) -> Vec<Vec<String>> {
         .collect::<Vec<_>>()
 }
 
-/// Return a new table provider containing all of the supported timestamp types
-pub fn table_with_timestamps() -> Arc<dyn TableProvider> {
-    let batch = make_timestamps();
-    let schema = batch.schema();
-    let partitions = vec![vec![batch]];
-    Arc::new(MemTable::try_new(schema, partitions).unwrap())
-}
-
-/// Return  record batch with all of the supported timestamp types
-/// values
-///
-/// Columns are named:
-/// "nanos" --> TimestampNanosecondArray
-/// "micros" --> TimestampMicrosecondArray
-/// "millis" --> TimestampMillisecondArray
-/// "secs" --> TimestampSecondArray
-/// "names" --> StringArray
-pub fn make_timestamps() -> RecordBatch {
-    let ts_strings = vec![
-        Some("2018-11-13T17:11:10.011375885995"),
-        Some("2011-12-13T11:13:10.12345"),
-        None,
-        Some("2021-1-1T05:11:10.432"),
-    ];
-
-    let ts_nanos = ts_strings
-        .into_iter()
-        .map(|t| {
-            t.map(|t| {
-                t.parse::<chrono::NaiveDateTime>()
-                    .unwrap()
-                    .timestamp_nanos()
-            })
-        })
-        .collect::<Vec<_>>();
-
-    let ts_micros = ts_nanos
-        .iter()
-        .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000))
-        .collect::<Vec<_>>();
-
-    let ts_millis = ts_nanos
-        .iter()
-        .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000))
-        .collect::<Vec<_>>();
-
-    let ts_secs = ts_nanos
-        .iter()
-        .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000000))
-        .collect::<Vec<_>>();
-
-    let names = ts_nanos
-        .iter()
-        .enumerate()
-        .map(|(i, _)| format!("Row {i}"))
-        .collect::<Vec<_>>();
-
-    let arr_nanos = TimestampNanosecondArray::from(ts_nanos);
-    let arr_micros = TimestampMicrosecondArray::from(ts_micros);
-    let arr_millis = TimestampMillisecondArray::from(ts_millis);
-    let arr_secs = TimestampSecondArray::from(ts_secs);
-
-    let names = names.iter().map(|s| s.as_str()).collect::<Vec<_>>();
-    let arr_names = StringArray::from(names);
-
-    let schema = Schema::new(vec![
-        Field::new("nanos", arr_nanos.data_type().clone(), true),
-        Field::new("micros", arr_micros.data_type().clone(), true),
-        Field::new("millis", arr_millis.data_type().clone(), true),
-        Field::new("secs", arr_secs.data_type().clone(), true),
-        Field::new("name", arr_names.data_type().clone(), true),
-    ]);
-    let schema = Arc::new(schema);
-
-    RecordBatch::try_new(
-        schema,
-        vec![
-            Arc::new(arr_nanos),
-            Arc::new(arr_micros),
-            Arc::new(arr_millis),
-            Arc::new(arr_secs),
-            Arc::new(arr_names),
-        ],
-    )
-    .unwrap()
-}
-
 #[tokio::test]
 async fn nyc() -> Result<()> {
     // schema for nyxtaxi csv files
diff --git a/datafusion/core/tests/sqllogictests/src/main.rs b/datafusion/core/tests/sqllogictests/src/main.rs
index d93d59fb3e..6f17010a39 100644
--- a/datafusion/core/tests/sqllogictests/src/main.rs
+++ b/datafusion/core/tests/sqllogictests/src/main.rs
@@ -285,6 +285,17 @@ async fn context_for_test_file(relative_path: &Path) -> Option<TestContext> {
                 return None;
             }
         }
+        "joins.slt" => {
+            info!("Registering timestamps tables");
+            setup::register_timestamps_table(test_ctx.session_ctx()).await;
+            setup::register_hashjoin_datatype_table(test_ctx.session_ctx()).await;
+            setup::register_left_semi_anti_join_table(test_ctx.session_ctx()).await;
+            setup::register_right_semi_anti_join_table(test_ctx.session_ctx()).await;
+
+            let mut test_ctx = test_ctx;
+            setup::register_partition_table(&mut test_ctx).await;
+            return Some(test_ctx);
+        }
         _ => {
             info!("Using default SessionContext");
         }
diff --git a/datafusion/core/tests/sqllogictests/src/setup.rs b/datafusion/core/tests/sqllogictests/src/setup.rs
index 8072a0f74f..96090d676a 100644
--- a/datafusion/core/tests/sqllogictests/src/setup.rs
+++ b/datafusion/core/tests/sqllogictests/src/setup.rs
@@ -15,6 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use arrow_array::types::Int32Type;
+use arrow_array::{
+    Array, Date32Array, Date64Array, Decimal128Array, DictionaryArray, StringArray,
+    TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
+    TimestampSecondArray,
+};
 use datafusion::{
     arrow::{
         array::{
@@ -28,9 +34,11 @@ use datafusion::{
     prelude::{CsvReadOptions, SessionContext},
     test_util,
 };
+use std::fs::File;
+use std::io::Write;
 use std::sync::Arc;
 
-use crate::utils;
+use crate::{utils, TestContext};
 
 #[cfg(feature = "avro")]
 pub async fn register_avro_tables(ctx: &mut crate::TestContext) {
@@ -212,3 +220,311 @@ fn register_nan_table(ctx: &SessionContext) {
     .unwrap();
     ctx.register_batch("test_float", data).unwrap();
 }
+
+pub async fn register_timestamps_table(ctx: &SessionContext) {
+    let batch = make_timestamps();
+    let schema = batch.schema();
+    let partitions = vec![vec![batch]];
+
+    ctx.register_table(
+        "test_timestamps_table",
+        Arc::new(MemTable::try_new(schema, partitions).unwrap()),
+    )
+    .unwrap();
+}
+
+/// Return  record batch with all of the supported timestamp types
+/// values
+///
+/// Columns are named:
+/// "nanos" --> TimestampNanosecondArray
+/// "micros" --> TimestampMicrosecondArray
+/// "millis" --> TimestampMillisecondArray
+/// "secs" --> TimestampSecondArray
+/// "names" --> StringArray
+pub fn make_timestamps() -> RecordBatch {
+    let ts_strings = vec![
+        Some("2018-11-13T17:11:10.011375885995"),
+        Some("2011-12-13T11:13:10.12345"),
+        None,
+        Some("2021-1-1T05:11:10.432"),
+    ];
+
+    let ts_nanos = ts_strings
+        .into_iter()
+        .map(|t| {
+            t.map(|t| {
+                t.parse::<chrono::NaiveDateTime>()
+                    .unwrap()
+                    .timestamp_nanos()
+            })
+        })
+        .collect::<Vec<_>>();
+
+    let ts_micros = ts_nanos
+        .iter()
+        .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000))
+        .collect::<Vec<_>>();
+
+    let ts_millis = ts_nanos
+        .iter()
+        .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000))
+        .collect::<Vec<_>>();
+
+    let ts_secs = ts_nanos
+        .iter()
+        .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000000))
+        .collect::<Vec<_>>();
+
+    let names = ts_nanos
+        .iter()
+        .enumerate()
+        .map(|(i, _)| format!("Row {i}"))
+        .collect::<Vec<_>>();
+
+    let arr_nanos = TimestampNanosecondArray::from(ts_nanos);
+    let arr_micros = TimestampMicrosecondArray::from(ts_micros);
+    let arr_millis = TimestampMillisecondArray::from(ts_millis);
+    let arr_secs = TimestampSecondArray::from(ts_secs);
+
+    let names = names.iter().map(|s| s.as_str()).collect::<Vec<_>>();
+    let arr_names = StringArray::from(names);
+
+    let schema = Schema::new(vec![
+        Field::new("nanos", arr_nanos.data_type().clone(), true),
+        Field::new("micros", arr_micros.data_type().clone(), true),
+        Field::new("millis", arr_millis.data_type().clone(), true),
+        Field::new("secs", arr_secs.data_type().clone(), true),
+        Field::new("name", arr_names.data_type().clone(), true),
+    ]);
+    let schema = Arc::new(schema);
+
+    RecordBatch::try_new(
+        schema,
+        vec![
+            Arc::new(arr_nanos),
+            Arc::new(arr_micros),
+            Arc::new(arr_millis),
+            Arc::new(arr_secs),
+            Arc::new(arr_names),
+        ],
+    )
+    .unwrap()
+}
+
+/// Generate a partitioned CSV file and register it with an execution context
+pub async fn register_partition_table(test_ctx: &mut TestContext) {
+    test_ctx.enable_testdir();
+    let partition_count = 1;
+    let file_extension = "csv";
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("c1", DataType::UInt32, false),
+        Field::new("c2", DataType::UInt64, false),
+        Field::new("c3", DataType::Boolean, false),
+    ]));
+    // generate a partitioned file
+    for partition in 0..partition_count {
+        let filename = format!("partition-{partition}.{file_extension}");
+        let file_path = test_ctx.testdir_path().join(filename);
+        let mut file = File::create(file_path).unwrap();
+
+        // generate some data
+        for i in 0..=10 {
+            let data = format!("{},{},{}\n", partition, i, i % 2 == 0);
+            file.write_all(data.as_bytes()).unwrap()
+        }
+    }
+
+    // register csv file with the execution context
+    test_ctx
+        .ctx
+        .register_csv(
+            "test_partition_table",
+            test_ctx.testdir_path().to_str().unwrap(),
+            CsvReadOptions::new().schema(&schema),
+        )
+        .await
+        .unwrap();
+}
+
+pub async fn register_hashjoin_datatype_table(ctx: &SessionContext) {
+    let t1_schema = Schema::new(vec![
+        Field::new("c1", DataType::Date32, true),
+        Field::new("c2", DataType::Date64, true),
+        Field::new("c3", DataType::Decimal128(5, 2), true),
+        Field::new(
+            "c4",
+            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
+            true,
+        ),
+    ]);
+    let dict1: DictionaryArray<Int32Type> =
+        vec!["abc", "def", "ghi", "jkl"].into_iter().collect();
+    let t1_data = RecordBatch::try_new(
+        Arc::new(t1_schema),
+        vec![
+            Arc::new(Date32Array::from(vec![Some(1), Some(2), None, Some(3)])),
+            Arc::new(Date64Array::from(vec![
+                Some(86400000),
+                Some(172800000),
+                Some(259200000),
+                None,
+            ])),
+            Arc::new(
+                Decimal128Array::from_iter_values([123, 45600, 78900, -12312])
+                    .with_precision_and_scale(5, 2)
+                    .unwrap(),
+            ),
+            Arc::new(dict1),
+        ],
+    )
+    .unwrap();
+    ctx.register_batch("hashjoin_datatype_table_t1", t1_data)
+        .unwrap();
+
+    let t2_schema = Schema::new(vec![
+        Field::new("c1", DataType::Date32, true),
+        Field::new("c2", DataType::Date64, true),
+        Field::new("c3", DataType::Decimal128(10, 2), true),
+        Field::new(
+            "c4",
+            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
+            true,
+        ),
+    ]);
+    let dict2: DictionaryArray<Int32Type> = vec!["abc", "abcdefg", "qwerty", "qwe"]
+        .into_iter()
+        .collect();
+    let t2_data = RecordBatch::try_new(
+        Arc::new(t2_schema),
+        vec![
+            Arc::new(Date32Array::from(vec![Some(1), None, None, Some(3)])),
+            Arc::new(Date64Array::from(vec![
+                Some(86400000),
+                None,
+                Some(259200000),
+                None,
+            ])),
+            Arc::new(
+                Decimal128Array::from_iter_values([-12312, 10000000, 0, 78900])
+                    .with_precision_and_scale(10, 2)
+                    .unwrap(),
+            ),
+            Arc::new(dict2),
+        ],
+    )
+    .unwrap();
+    ctx.register_batch("hashjoin_datatype_table_t2", t2_data)
+        .unwrap();
+}
+
+pub async fn register_left_semi_anti_join_table(ctx: &SessionContext) {
+    let t1_schema = Arc::new(Schema::new(vec![
+        Field::new("t1_id", DataType::UInt32, true),
+        Field::new("t1_name", DataType::Utf8, true),
+        Field::new("t1_int", DataType::UInt32, true),
+    ]));
+    let t1_data = RecordBatch::try_new(
+        t1_schema,
+        vec![
+            Arc::new(UInt32Array::from(vec![
+                Some(11),
+                Some(11),
+                Some(22),
+                Some(33),
+                Some(44),
+                None,
+            ])),
+            Arc::new(StringArray::from(vec![
+                Some("a"),
+                Some("a"),
+                Some("b"),
+                Some("c"),
+                Some("d"),
+                Some("e"),
+            ])),
+            Arc::new(UInt32Array::from(vec![1, 1, 2, 3, 4, 0])),
+        ],
+    )
+    .unwrap();
+    ctx.register_batch("left_semi_anti_join_table_t1", t1_data)
+        .unwrap();
+
+    let t2_schema = Arc::new(Schema::new(vec![
+        Field::new("t2_id", DataType::UInt32, true),
+        Field::new("t2_name", DataType::Utf8, true),
+        Field::new("t2_int", DataType::UInt32, true),
+    ]));
+    let t2_data = RecordBatch::try_new(
+        t2_schema,
+        vec![
+            Arc::new(UInt32Array::from(vec![
+                Some(11),
+                Some(11),
+                Some(22),
+                Some(44),
+                Some(55),
+                None,
+            ])),
+            Arc::new(StringArray::from(vec![
+                Some("z"),
+                Some("z"),
+                Some("y"),
+                Some("x"),
+                Some("w"),
+                Some("v"),
+            ])),
+            Arc::new(UInt32Array::from(vec![3, 3, 1, 3, 3, 0])),
+        ],
+    )
+    .unwrap();
+    ctx.register_batch("left_semi_anti_join_table_t2", t2_data)
+        .unwrap();
+}
+
+pub async fn register_right_semi_anti_join_table(ctx: &SessionContext) {
+    let t1_schema = Arc::new(Schema::new(vec![
+        Field::new("t1_id", DataType::UInt32, true),
+        Field::new("t1_name", DataType::Utf8, true),
+        Field::new("t1_int", DataType::UInt32, true),
+    ]));
+    let t1_data = RecordBatch::try_new(
+        t1_schema,
+        vec![
+            Arc::new(UInt32Array::from(vec![
+                Some(11),
+                Some(22),
+                Some(33),
+                Some(44),
+                None,
+            ])),
+            Arc::new(StringArray::from(vec![
+                Some("a"),
+                Some("b"),
+                Some("c"),
+                Some("d"),
+                Some("e"),
+            ])),
+            Arc::new(UInt32Array::from(vec![1, 2, 3, 4, 0])),
+        ],
+    )
+    .unwrap();
+    ctx.register_batch("right_semi_anti_join_table_t1", t1_data)
+        .unwrap();
+
+    let t2_schema = Arc::new(Schema::new(vec![
+        Field::new("t2_id", DataType::UInt32, true),
+        Field::new("t2_name", DataType::Utf8, true),
+    ]));
+    // t2 data size is smaller than t1
+    let t2_data = RecordBatch::try_new(
+        t2_schema,
+        vec![
+            Arc::new(UInt32Array::from(vec![Some(11), Some(11), None])),
+            Arc::new(StringArray::from(vec![Some("a"), Some("x"), None])),
+        ],
+    )
+    .unwrap();
+    ctx.register_batch("right_semi_anti_join_table_t2", t2_data)
+        .unwrap();
+}
diff --git a/datafusion/core/tests/sqllogictests/test_files/joins.slt b/datafusion/core/tests/sqllogictests/test_files/joins.slt
index 8e8527fc3d..4486d7c47b 100644
--- a/datafusion/core/tests/sqllogictests/test_files/joins.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/joins.slt
@@ -2337,3 +2337,603 @@ WHERE NOT EXISTS(
 
 statement ok
 set datafusion.explain.logical_plan_only = false;
+
+
+# test timestamp join on nanos datatype
+query PPPPTPPPPT rowsort
+SELECT * FROM test_timestamps_table as t1 JOIN (SELECT * FROM test_timestamps_table ) as t2 ON t1.nanos = t2.nanos;
+----
+2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123 2011-12-13T11:13:10 Row 1 2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123 2011-12-13T11:13:10 Row 1
+2018-11-13T17:11:10.011375885 2018-11-13T17:11:10.011375 2018-11-13T17:11:10.011 2018-11-13T17:11:10 Row 0 2018-11-13T17:11:10.011375885 2018-11-13T17:11:10.011375 2018-11-13T17:11:10.011 2018-11-13T17:11:10 Row 0
+2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10 Row 3 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10 Row 3
+
+# test timestamp join on micros datatype
+query PPPPTPPPPT rowsort
+SELECT * FROM test_timestamps_table as t1 JOIN (SELECT * FROM test_timestamps_table ) as t2 ON t1.micros = t2.micros
+----
+2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123 2011-12-13T11:13:10 Row 1 2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123 2011-12-13T11:13:10 Row 1
+2018-11-13T17:11:10.011375885 2018-11-13T17:11:10.011375 2018-11-13T17:11:10.011 2018-11-13T17:11:10 Row 0 2018-11-13T17:11:10.011375885 2018-11-13T17:11:10.011375 2018-11-13T17:11:10.011 2018-11-13T17:11:10 Row 0
+2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10 Row 3 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10 Row 3
+
+# test timestamp join on millis datatype
+query PPPPTPPPPT rowsort
+SELECT * FROM test_timestamps_table as t1 JOIN (SELECT * FROM test_timestamps_table ) as t2 ON t1.millis = t2.millis
+----
+2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123 2011-12-13T11:13:10 Row 1 2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123 2011-12-13T11:13:10 Row 1
+2018-11-13T17:11:10.011375885 2018-11-13T17:11:10.011375 2018-11-13T17:11:10.011 2018-11-13T17:11:10 Row 0 2018-11-13T17:11:10.011375885 2018-11-13T17:11:10.011375 2018-11-13T17:11:10.011 2018-11-13T17:11:10 Row 0
+2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10 Row 3 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10 Row 3
+
+# left_join_using_2
+query II
+SELECT t1.c1, t2.c2 FROM test_partition_table t1 JOIN test_partition_table t2 USING (c2) ORDER BY t2.c2;
+----
+0 1
+0 2
+0 3
+0 4
+0 5
+0 6
+0 7
+0 8
+0 9
+0 10
+
+# left_join_using_join_key_projection
+query III
+SELECT t1.c1, t1.c2, t2.c2 FROM test_partition_table t1 JOIN test_partition_table t2 USING (c2) ORDER BY t2.c2
+----
+0 1 1
+0 2 2
+0 3 3
+0 4 4
+0 5 5
+0 6 6
+0 7 7
+0 8 8
+0 9 9
+0 10 10
+
+# left_join_2
+query III
+SELECT t1.c1, t1.c2, t2.c2 FROM test_partition_table t1 JOIN test_partition_table t2 ON t1.c2 = t2.c2 ORDER BY t1.c2
+----
+0 1 1
+0 2 2
+0 3 3
+0 4 4
+0 5 5
+0 6 6
+0 7 7
+0 8 8
+0 9 9
+0 10 10
+
+####
+# Config setup
+####
+
+statement ok
+set datafusion.explain.logical_plan_only = true
+
+# explain hash_join_with_date32
+query TT
+explain select * from hashjoin_datatype_table_t1 t1 join hashjoin_datatype_table_t2 t2 on t1.c1 = t2.c1
+----
+logical_plan
+Inner Join: t1.c1 = t2.c1
+--SubqueryAlias: t1
+----TableScan: hashjoin_datatype_table_t1 projection=[c1, c2, c3, c4]
+--SubqueryAlias: t2
+----TableScan: hashjoin_datatype_table_t2 projection=[c1, c2, c3, c4]
+
+# hash_join_with_date32
+query DDR?DDR? rowsort
+select * from hashjoin_datatype_table_t1 t1 join hashjoin_datatype_table_t2 t2 on t1.c1 = t2.c1
+----
+1970-01-02 1970-01-02T00:00:00 1.23 abc 1970-01-02 1970-01-02T00:00:00 -123.12 abc
+1970-01-04 NULL -123.12 jkl 1970-01-04 NULL 789 qwe
+
+
+# explain hash_join_with_date64
+query TT
+explain select * from hashjoin_datatype_table_t1 t1 left join hashjoin_datatype_table_t2 t2 on t1.c2 = t2.c2
+----
+logical_plan
+Left Join: t1.c2 = t2.c2
+--SubqueryAlias: t1
+----TableScan: hashjoin_datatype_table_t1 projection=[c1, c2, c3, c4]
+--SubqueryAlias: t2
+----TableScan: hashjoin_datatype_table_t2 projection=[c1, c2, c3, c4]
+
+# hash_join_with_date64
+query DDR?DDR? rowsort
+select * from hashjoin_datatype_table_t1 t1 left join hashjoin_datatype_table_t2 t2 on t1.c2 = t2.c2
+----
+1970-01-02 1970-01-02T00:00:00 1.23 abc 1970-01-02 1970-01-02T00:00:00 -123.12 abc
+1970-01-03 1970-01-03T00:00:00 456 def NULL NULL NULL NULL
+1970-01-04 NULL -123.12 jkl NULL NULL NULL NULL
+NULL 1970-01-04T00:00:00 789 ghi NULL 1970-01-04T00:00:00 0 qwerty
+
+
+# explain hash_join_with_decimal
+query TT
+explain select * from hashjoin_datatype_table_t1 t1 right join hashjoin_datatype_table_t1 t2 on t1.c3 = t2.c3
+----
+logical_plan
+Right Join: t1.c3 = t2.c3
+--SubqueryAlias: t1
+----TableScan: hashjoin_datatype_table_t1 projection=[c1, c2, c3, c4]
+--SubqueryAlias: t2
+----TableScan: hashjoin_datatype_table_t1 projection=[c1, c2, c3, c4]
+
+# hash_join_with_decimal
+query DDR?DDR? rowsort
+select * from hashjoin_datatype_table_t1 t1 right join hashjoin_datatype_table_t1 t2 on t1.c3 = t2.c3
+----
+1970-01-02 1970-01-02T00:00:00 1.23 abc 1970-01-02 1970-01-02T00:00:00 1.23 abc
+1970-01-03 1970-01-03T00:00:00 456 def 1970-01-03 1970-01-03T00:00:00 456 def
+1970-01-04 NULL -123.12 jkl 1970-01-04 NULL -123.12 jkl
+NULL 1970-01-04T00:00:00 789 ghi NULL 1970-01-04T00:00:00 789 ghi
+
+# explain hash_join_with_dictionary
+query TT
+explain select * from hashjoin_datatype_table_t1 t1 join hashjoin_datatype_table_t1 t2 on t1.c4 = t2.c4
+----
+logical_plan
+Inner Join: t1.c4 = t2.c4
+--SubqueryAlias: t1
+----TableScan: hashjoin_datatype_table_t1 projection=[c1, c2, c3, c4]
+--SubqueryAlias: t2
+----TableScan: hashjoin_datatype_table_t1 projection=[c1, c2, c3, c4]
+
+# hash_join_with_dictionary
+query DDR?DDR? rowsort
+select * from hashjoin_datatype_table_t1 t1 join hashjoin_datatype_table_t2 t2 on t1.c4 = t2.c4
+----
+1970-01-02 1970-01-02T00:00:00 1.23 abc 1970-01-02 1970-01-02T00:00:00 -123.12 abc
+
+####
+# Config teardown
+####
+statement ok
+set datafusion.explain.logical_plan_only = false
+
+
+####
+# Config setup
+####
+statement ok
+set datafusion.explain.logical_plan_only = false;
+
+statement ok
+set datafusion.optimizer.prefer_hash_join = false;
+
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+statement ok
+set datafusion.execution.batch_size = 4096;
+
+# explain sort_merge_join_on_date32 inner sort merge join on data type (Date32)
+query TT
+explain select * from hashjoin_datatype_table_t1 t1 join hashjoin_datatype_table_t2 t2 on t1.c1 = t2.c1
+----
+logical_plan
+Inner Join: t1.c1 = t2.c1
+--SubqueryAlias: t1
+----TableScan: hashjoin_datatype_table_t1 projection=[c1, c2, c3, c4]
+--SubqueryAlias: t2
+----TableScan: hashjoin_datatype_table_t2 projection=[c1, c2, c3, c4]
+physical_plan
+SortMergeJoin: join_type=Inner, on=[(Column { name: "c1", index: 0 }, Column { name: "c1", index: 0 })]
+--SortExec: expr=[c1@0 ASC]
+----CoalesceBatchesExec: target_batch_size=4096
+------RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 2), input_partitions=2
+--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----------MemoryExec: partitions=1, partition_sizes=[1]
+--SortExec: expr=[c1@0 ASC]
+----CoalesceBatchesExec: target_batch_size=4096
+------RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 2), input_partitions=2
+--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----------MemoryExec: partitions=1, partition_sizes=[1]
+
+# sort_merge_join_on_date32 inner sort merge join on data type (Date32)
+query DDR?DDR? rowsort
+select * from hashjoin_datatype_table_t1 t1 join hashjoin_datatype_table_t2 t2 on t1.c1 = t2.c1
+----
+1970-01-02 1970-01-02T00:00:00 1.23 abc 1970-01-02 1970-01-02T00:00:00 -123.12 abc
+1970-01-04 NULL -123.12 jkl 1970-01-04 NULL 789 qwe
+
+# explain sort_merge_join_on_decimal right join on data type (Decimal)
+query TT
+explain select * from hashjoin_datatype_table_t1 t1 right join hashjoin_datatype_table_t2 t2 on t1.c3 = t2.c3
+----
+logical_plan
+Right Join: CAST(t1.c3 AS Decimal128(10, 2)) = t2.c3
+--SubqueryAlias: t1
+----TableScan: hashjoin_datatype_table_t1 projection=[c1, c2, c3, c4]
+--SubqueryAlias: t2
+----TableScan: hashjoin_datatype_table_t2 projection=[c1, c2, c3, c4]
+physical_plan
+ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as c4, c1@5 as c1, c2@6 as c2, c3@7 as c3, c4@8 as c4]
+--SortMergeJoin: join_type=Right, on=[(Column { name: "CAST(t1.c3 AS Decimal128(10, 2))", index: 4 }, Column { name: "c3", index: 2 })]
+----SortExec: expr=[CAST(t1.c3 AS Decimal128(10, 2))@4 ASC]
+------CoalesceBatchesExec: target_batch_size=4096
+--------RepartitionExec: partitioning=Hash([Column { name: "CAST(t1.c3 AS Decimal128(10, 2))", index: 4 }], 2), input_partitions=2
+----------ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as c4, CAST(c3@2 AS Decimal128(10, 2)) as CAST(t1.c3 AS Decimal128(10, 2))]
+------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+--------------MemoryExec: partitions=1, partition_sizes=[1]
+----SortExec: expr=[c3@2 ASC]
+------CoalesceBatchesExec: target_batch_size=4096
+--------RepartitionExec: partitioning=Hash([Column { name: "c3", index: 2 }], 2), input_partitions=2
+----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+------------MemoryExec: partitions=1, partition_sizes=[1]
+
+# sort_merge_join_on_decimal right join on data type (Decimal)
+query DDR?DDR? rowsort
+select * from hashjoin_datatype_table_t1 t1 right join hashjoin_datatype_table_t2 t2 on t1.c3 = t2.c3
+----
+1970-01-04 NULL -123.12 jkl 1970-01-02 1970-01-02T00:00:00 -123.12 abc
+NULL 1970-01-04T00:00:00 789 ghi 1970-01-04 NULL 789 qwe
+NULL NULL NULL NULL NULL 1970-01-04T00:00:00 0 qwerty
+NULL NULL NULL NULL NULL NULL 100000 abcdefg
+
+####
+# Config teardown
+####
+statement ok
+set datafusion.explain.logical_plan_only = true;
+
+statement ok
+set datafusion.optimizer.prefer_hash_join = true;
+
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+statement ok
+set datafusion.execution.batch_size = 4096;
+
+
+
+#Test the left_semi_join scenarios where the current repartition_joins parameter is set to true .
+####
+# Config setup
+####
+statement ok
+set datafusion.explain.logical_plan_only = false;
+
+statement ok
+set datafusion.explain.physical_plan_only = true;
+
+statement ok
+set datafusion.optimizer.repartition_joins = true;
+
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+statement ok
+set datafusion.execution.batch_size = 4096;
+
+query TT
+explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id IN (SELECT t2_id FROM left_semi_anti_join_table_t2 t2) ORDER BY t1_id
+----
+physical_plan
+SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
+--SortExec: expr=[t1_id@0 ASC NULLS LAST]
+----CoalesceBatchesExec: target_batch_size=4096
+------HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(Column { name: "t1_id", index: 0 }, Column { name: "t2_id", index: 0 })]
+--------CoalesceBatchesExec: target_batch_size=4096
+----------RepartitionExec: partitioning=Hash([Column { name: "t1_id", index: 0 }], 2), input_partitions=2
+------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+--------------MemoryExec: partitions=1, partition_sizes=[1]
+--------CoalesceBatchesExec: target_batch_size=4096
+----------RepartitionExec: partitioning=Hash([Column { name: "t2_id", index: 0 }], 2), input_partitions=2
+------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+--------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query IT rowsort
+SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id IN (SELECT t2_id FROM left_semi_anti_join_table_t2 t2) ORDER BY t1_id
+----
+11 a
+11 a
+22 b
+44 d
+
+query IT rowsort
+SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE EXISTS (SELECT 1 FROM left_semi_anti_join_table_t2 t2 WHERE t1_id = t2_id) ORDER BY t1_id
+----
+11 a
+11 a
+22 b
+44 d
+
+query I rowsort
+SELECT t1_id FROM left_semi_anti_join_table_t1 t1 INTERSECT SELECT t2_id FROM left_semi_anti_join_table_t2 t2 ORDER BY t1_id
+----
+11
+22
+44
+NULL
+
+query TT
+explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 LEFT SEMI JOIN left_semi_anti_join_table_t2 t2 ON (t1_id = t2_id) ORDER BY t1_id
+----
+physical_plan
+SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
+--SortExec: expr=[t1_id@0 ASC NULLS LAST]
+----CoalesceBatchesExec: target_batch_size=4096
+------HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(Column { name: "t1_id", index: 0 }, Column { name: "t2_id", index: 0 })]
+--------CoalesceBatchesExec: target_batch_size=4096
+----------RepartitionExec: partitioning=Hash([Column { name: "t1_id", index: 0 }], 2), input_partitions=2
+------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+--------------MemoryExec: partitions=1, partition_sizes=[1]
+--------CoalesceBatchesExec: target_batch_size=4096
+----------RepartitionExec: partitioning=Hash([Column { name: "t2_id", index: 0 }], 2), input_partitions=2
+------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+--------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query IT
+SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 LEFT SEMI JOIN left_semi_anti_join_table_t2 t2 ON (t1_id = t2_id) ORDER BY t1_id
+----
+11 a
+11 a
+22 b
+44 d
+
+####
+# Config teardown
+####
+statement ok
+set datafusion.explain.logical_plan_only = true;
+
+statement ok
+set datafusion.explain.physical_plan_only = false;
+
+statement ok
+set datafusion.optimizer.repartition_joins = true;
+
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+statement ok
+set datafusion.execution.batch_size = 4096;
+
+#Test the left_semi_join scenarios where the current repartition_joins parameter is set to false .
+####
+# Config setup
+####
+statement ok
+set datafusion.explain.logical_plan_only = false;
+
+statement ok
+set datafusion.explain.physical_plan_only = true;
+
+statement ok
+set datafusion.optimizer.repartition_joins = false;
+
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+statement ok
+set datafusion.execution.batch_size = 4096;
+
+query TT
+explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id IN (SELECT t2_id FROM left_semi_anti_join_table_t2 t2) ORDER BY t1_id
+----
+physical_plan
+SortExec: expr=[t1_id@0 ASC NULLS LAST]
+--CoalesceBatchesExec: target_batch_size=4096
+----HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(Column { name: "t1_id", index: 0 }, Column { name: "t2_id", index: 0 })]
+------MemoryExec: partitions=1, partition_sizes=[1]
+------MemoryExec: partitions=1, partition_sizes=[1]
+
+query IT rowsort
+SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id IN (SELECT t2_id FROM left_semi_anti_join_table_t2 t2) ORDER BY t1_id
+----
+11 a
+11 a
+22 b
+44 d
+
+query IT rowsort
+SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE EXISTS (SELECT 1 FROM left_semi_anti_join_table_t2 t2 WHERE t1_id = t2_id) ORDER BY t1_id
+----
+11 a
+11 a
+22 b
+44 d
+
+query I rowsort
+SELECT t1_id FROM left_semi_anti_join_table_t1 t1 INTERSECT SELECT t2_id FROM left_semi_anti_join_table_t2 t2 ORDER BY t1_id
+----
+11
+22
+44
+NULL
+
+query TT
+explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 LEFT SEMI JOIN left_semi_anti_join_table_t2 t2 ON (t1_id = t2_id) ORDER BY t1_id
+----
+physical_plan
+SortExec: expr=[t1_id@0 ASC NULLS LAST]
+--CoalesceBatchesExec: target_batch_size=4096
+----HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(Column { name: "t1_id", index: 0 }, Column { name: "t2_id", index: 0 })]
+------MemoryExec: partitions=1, partition_sizes=[1]
+------MemoryExec: partitions=1, partition_sizes=[1]
+
+query IT
+SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 LEFT SEMI JOIN left_semi_anti_join_table_t2 t2 ON (t1_id = t2_id) ORDER BY t1_id
+----
+11 a
+11 a
+22 b
+44 d
+
+####
+# Config teardown
+####
+statement ok
+set datafusion.explain.logical_plan_only = true;
+
+statement ok
+set datafusion.explain.physical_plan_only = false;
+
+statement ok
+set datafusion.optimizer.repartition_joins = true;
+
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+statement ok
+set datafusion.execution.batch_size = 4096;
+
+
+#Test the right_semi_join scenarios where the current repartition_joins parameter is set to true .
+####
+# Config setup
+####
+statement ok
+set datafusion.explain.logical_plan_only = false;
+
+statement ok
+set datafusion.explain.physical_plan_only = true;
+
+statement ok
+set datafusion.optimizer.repartition_joins = true;
+
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+statement ok
+set datafusion.execution.batch_size = 4096;
+
+query TT
+explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1 WHERE EXISTS (SELECT * FROM right_semi_anti_join_table_t2 t2 where t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id
+----
+physical_plan
+SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
+--SortExec: expr=[t1_id@0 ASC NULLS LAST]
+----CoalesceBatchesExec: target_batch_size=4096
+------HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(Column { name: "t2_id", index: 0 }, Column { name: "t1_id", index: 0 })], filter=t2_name@1 != t1_name@0
+--------CoalesceBatchesExec: target_batch_size=4096
+----------RepartitionExec: partitioning=Hash([Column { name: "t2_id", index: 0 }], 2), input_partitions=2
+------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+--------------MemoryExec: partitions=1, partition_sizes=[1]
+--------CoalesceBatchesExec: target_batch_size=4096
+----------RepartitionExec: partitioning=Hash([Column { name: "t1_id", index: 0 }], 2), input_partitions=2
+------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+--------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query ITI rowsort
+SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1 WHERE EXISTS (SELECT * FROM right_semi_anti_join_table_t2 t2 where t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id
+----
+11 a 1
+
+query TT
+explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t2 t2 RIGHT SEMI JOIN right_semi_anti_join_table_t1 t1 on (t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id
+----
+physical_plan
+SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
+--SortExec: expr=[t1_id@0 ASC NULLS LAST]
+----CoalesceBatchesExec: target_batch_size=4096
+------HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(Column { name: "t2_id", index: 0 }, Column { name: "t1_id", index: 0 })], filter=t2_name@0 != t1_name@1
+--------CoalesceBatchesExec: target_batch_size=4096
+----------RepartitionExec: partitioning=Hash([Column { name: "t2_id", index: 0 }], 2), input_partitions=2
+------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+--------------MemoryExec: partitions=1, partition_sizes=[1]
+--------CoalesceBatchesExec: target_batch_size=4096
+----------RepartitionExec: partitioning=Hash([Column { name: "t1_id", index: 0 }], 2), input_partitions=2
+------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+--------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query ITI rowsort
+SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t2 t2 RIGHT SEMI JOIN right_semi_anti_join_table_t1 t1 on (t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id
+----
+11 a 1
+
+####
+# Config teardown
+####
+statement ok
+set datafusion.explain.logical_plan_only = true;
+
+statement ok
+set datafusion.explain.physical_plan_only = false;
+
+statement ok
+set datafusion.optimizer.repartition_joins = true;
+
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+statement ok
+set datafusion.execution.batch_size = 4096;
+
+
+#Test the right_semi_join scenarios where the current repartition_joins parameter is set to false .
+####
+# Config setup
+####
+statement ok
+set datafusion.explain.logical_plan_only = false;
+
+statement ok
+set datafusion.explain.physical_plan_only = true;
+
+statement ok
+set datafusion.optimizer.repartition_joins = false;
+
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+statement ok
+set datafusion.execution.batch_size = 4096;
+
+query TT
+explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1 WHERE EXISTS (SELECT * FROM right_semi_anti_join_table_t2 t2 where t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id
+----
+physical_plan
+SortExec: expr=[t1_id@0 ASC NULLS LAST]
+--CoalesceBatchesExec: target_batch_size=4096
+----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(Column { name: "t2_id", index: 0 }, Column { name: "t1_id", index: 0 })], filter=t2_name@1 != t1_name@0
+------MemoryExec: partitions=1, partition_sizes=[1]
+------MemoryExec: partitions=1, partition_sizes=[1]
+
+query ITI rowsort
+SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1 WHERE EXISTS (SELECT * FROM right_semi_anti_join_table_t2 t2 where t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id
+----
+11 a 1
+
+query TT
+explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t2 t2 RIGHT SEMI JOIN right_semi_anti_join_table_t1 t1 on (t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id
+----
+physical_plan
+SortExec: expr=[t1_id@0 ASC NULLS LAST]
+--CoalesceBatchesExec: target_batch_size=4096
+----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(Column { name: "t2_id", index: 0 }, Column { name: "t1_id", index: 0 })], filter=t2_name@0 != t1_name@1
+------MemoryExec: partitions=1, partition_sizes=[1]
+------MemoryExec: partitions=1, partition_sizes=[1]
+
+query ITI rowsort
+SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t2 t2 RIGHT SEMI JOIN right_semi_anti_join_table_t1 t1 on (t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id
+----
+11 a 1
+
+####
+# Config teardown
+####
+statement ok
+set datafusion.explain.logical_plan_only = true;
+
+statement ok
+set datafusion.explain.physical_plan_only = false;
+
+statement ok
+set datafusion.optimizer.repartition_joins = true;
+
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+statement ok
+set datafusion.execution.batch_size = 4096;
+