You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/01/19 14:44:12 UTC

[arrow-datafusion] branch master updated: Support join-filter pushdown for semi/anti join (#4923)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new dde23efed Support join-filter pushdown for semi/anti join (#4923)
dde23efed is described below

commit dde23efed94704044822bcefe49c0af7f9260088
Author: ygf11 <ya...@gmail.com>
AuthorDate: Thu Jan 19 22:44:05 2023 +0800

    Support join-filter pushdown for semi/anti join (#4923)
    
    * Support filter pushdown for semi/anti join
    
    * improve comment
    
    * add subquery.slt for sqllogictests
    
    * add tests
    
    * improve test
---
 datafusion/core/tests/dataframe.rs                 |  89 ++++++++++
 datafusion/core/tests/sql/joins.rs                 |  71 ++++++--
 .../core/tests/sqllogictests/test_files/join.slt   |  43 ++++-
 .../tests/sqllogictests/test_files/subquery.slt    |  62 +++++++
 datafusion/optimizer/src/push_down_filter.rs       | 189 ++++++++++++++++++++-
 5 files changed, 431 insertions(+), 23 deletions(-)

diff --git a/datafusion/core/tests/dataframe.rs b/datafusion/core/tests/dataframe.rs
index e4d9d1c4e..39b7739af 100644
--- a/datafusion/core/tests/dataframe.rs
+++ b/datafusion/core/tests/dataframe.rs
@@ -411,6 +411,95 @@ async fn join_with_alias_filter() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn right_semi_with_alias_filter() -> Result<()> {
+    let join_ctx = create_join_context()?;
+    let t1 = join_ctx.table("t1").await?;
+    let t2 = join_ctx.table("t2").await?;
+
+    // t1.a = t2.a and t1.c > 1 and t2.c > 1
+    let filter = col("t1.a")
+        .eq(col("t2.a"))
+        .and(col("t1.c").gt(lit(1u32)))
+        .and(col("t2.c").gt(lit(1u32)));
+
+    let df = t1
+        .join(t2, JoinType::RightSemi, &[], &[], Some(filter))?
+        .select(vec![col("t2.a"), col("t2.b"), col("t2.c")])?;
+    let optimized_plan = df.clone().into_optimized_plan()?;
+    let expected = vec![
+        "Projection: t2.a, t2.b, t2.c [a:UInt32, b:Utf8, c:Int32]",
+        "  RightSemi Join: t1.a = t2.a [a:UInt32, b:Utf8, c:Int32]",
+        "    Filter: t1.c > Int32(1) [a:UInt32, c:Int32]",
+        "      TableScan: t1 projection=[a, c] [a:UInt32, c:Int32]",
+        "    Filter: t2.c > Int32(1) [a:UInt32, b:Utf8, c:Int32]",
+        "      TableScan: t2 projection=[a, b, c] [a:UInt32, b:Utf8, c:Int32]",
+    ];
+
+    let formatted = optimized_plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    let results = df.collect().await?;
+    let expected: Vec<&str> = vec![
+        "+-----+---+---+",
+        "| a   | b | c |",
+        "+-----+---+---+",
+        "| 10  | b | 2 |",
+        "| 100 | d | 4 |",
+        "+-----+---+---+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+    Ok(())
+}
+
+#[tokio::test]
+async fn right_anti_filter_push_down() -> Result<()> {
+    let join_ctx = create_join_context()?;
+    let t1 = join_ctx.table("t1").await?;
+    let t2 = join_ctx.table("t2").await?;
+
+    // t1.a = t2.a and t1.c > 1 and t2.c > 1
+    let filter = col("t1.a")
+        .eq(col("t2.a"))
+        .and(col("t1.c").gt(lit(1u32)))
+        .and(col("t2.c").gt(lit(1u32)));
+
+    let df = t1
+        .join(t2, JoinType::RightAnti, &[], &[], Some(filter))?
+        .select(vec![col("t2.a"), col("t2.b"), col("t2.c")])?;
+    let optimized_plan = df.clone().into_optimized_plan()?;
+    let expected = vec![
+        "Projection: t2.a, t2.b, t2.c [a:UInt32, b:Utf8, c:Int32]",
+        "  RightAnti Join: t1.a = t2.a Filter: t2.c > Int32(1) [a:UInt32, b:Utf8, c:Int32]",
+        "    Filter: t1.c > Int32(1) [a:UInt32, c:Int32]",
+        "      TableScan: t1 projection=[a, c] [a:UInt32, c:Int32]",
+        "    TableScan: t2 projection=[a, b, c] [a:UInt32, b:Utf8, c:Int32]",
+    ];
+
+    let formatted = optimized_plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    let results = df.collect().await?;
+    let expected: Vec<&str> = vec![
+        "+----+---+---+",
+        "| a  | b | c |",
+        "+----+---+---+",
+        "| 13 | c | 3 |",
+        "| 3  | a | 1 |",
+        "+----+---+---+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+    Ok(())
+}
+
 async fn create_test_table() -> Result<DataFrame> {
     let schema = Arc::new(Schema::new(vec![
         Field::new("a", DataType::Utf8, false),
diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs
index ac02a0d57..e0bd1a523 100644
--- a/datafusion/core/tests/sql/joins.rs
+++ b/datafusion/core/tests/sql/joins.rs
@@ -2112,6 +2112,34 @@ async fn left_semi_join() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn left_semi_join_pushdown() -> Result<()> {
+    let ctx = create_left_semi_anti_join_context_with_null_ids("t1_id", "t2_id", false)
+        .unwrap();
+
+    // assert logical plan
+    let sql = "SELECT t1.t1_id, t1.t1_name FROM t1 LEFT SEMI JOIN t2 ON (t1.t1_id = t2.t2_id and t2.t2_int > 1)";
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
+    let plan = dataframe.into_optimized_plan()?;
+    let expected = vec![
+        "Explain [plan_type:Utf8, plan:Utf8]",
+        "  Projection: t1.t1_id, t1.t1_name [t1_id:UInt32;N, t1_name:Utf8;N]",
+        "    LeftSemi Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N]",
+        "      TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
+        "      Filter: t2.t2_int > UInt32(1) [t2_id:UInt32;N, t2_int:UInt32;N]",
+        "        TableScan: t2 projection=[t2_id, t2_int] [t2_id:UInt32;N, t2_int:UInt32;N]", 
+    ];
+    let formatted = plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    Ok(())
+}
+
 #[tokio::test]
 async fn left_anti_join() -> Result<()> {
     let test_repartition_joins = vec![true, false];
@@ -3126,13 +3154,12 @@ async fn in_subquery_to_join_with_correlated_outer_filter() -> Result<()> {
     let msg = format!("Creating logical plan for '{sql}'");
     let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
     let plan = dataframe.into_optimized_plan().unwrap();
-
-    // The `t1.t1_int > UInt32(0)` should be pushdown by `filter push down rule`.
     let expected = vec![
         "Explain [plan_type:Utf8, plan:Utf8]",
         "  Projection: t1.t1_id, t1.t1_name, t1.t1_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-        "    LeftSemi Join: CAST(t1.t1_id AS Int64) + Int64(12) = __correlated_sq_1.CAST(t2_id AS Int64) + Int64(1) Filter: t1.t1_int > UInt32(0) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-        "      TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "    LeftSemi Join: CAST(t1.t1_id AS Int64) + Int64(12) = __correlated_sq_1.CAST(t2_id AS Int64) + Int64(1) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "      Filter: t1.t1_int > UInt32(0) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "        TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
         "      SubqueryAlias: __correlated_sq_1 [CAST(t2_id AS Int64) + Int64(1):Int64;N]",
         "        Projection: CAST(t2.t2_id AS Int64) + Int64(1) AS CAST(t2_id AS Int64) + Int64(1) [CAST(t2_id AS Int64) + Int64(1):Int64;N]",
         "          TableScan: t2 projection=[t2_id] [t2_id:UInt32;N]",
@@ -3144,20 +3171,36 @@ async fn in_subquery_to_join_with_correlated_outer_filter() -> Result<()> {
         expected, actual,
         "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
     );
+    Ok(())
+}
+
+#[tokio::test]
+async fn not_in_subquery_to_join_with_correlated_outer_filter() -> Result<()> {
+    let ctx = create_join_context("t1_id", "t2_id", false)?;
+
+    let sql = "select t1.t1_id, t1.t1_name, t1.t1_int from t1 where t1.t1_id + 12 not in 
+                         (select t2.t2_id + 1 from t2 where t1.t1_int > 0)";
 
+    // assert logical plan
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
+    let plan = dataframe.into_optimized_plan().unwrap();
     let expected = vec![
-        "+-------+---------+--------+",
-        "| t1_id | t1_name | t1_int |",
-        "+-------+---------+--------+",
-        "| 11    | a       | 1      |",
-        "| 33    | c       | 3      |",
-        "| 44    | d       | 4      |",
-        "+-------+---------+--------+",
+        "Explain [plan_type:Utf8, plan:Utf8]",
+        "  Projection: t1.t1_id, t1.t1_name, t1.t1_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "    LeftAnti Join: CAST(t1.t1_id AS Int64) + Int64(12) = __correlated_sq_1.CAST(t2_id AS Int64) + Int64(1) Filter: t1.t1_int > UInt32(0) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "      TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "      SubqueryAlias: __correlated_sq_1 [CAST(t2_id AS Int64) + Int64(1):Int64;N]",
+        "        Projection: CAST(t2.t2_id AS Int64) + Int64(1) AS CAST(t2_id AS Int64) + Int64(1) [CAST(t2_id AS Int64) + Int64(1):Int64;N]",
+        "          TableScan: t2 projection=[t2_id] [t2_id:UInt32;N]",
     ];
 
-    let results = execute_to_batches(&ctx, sql).await;
-    assert_batches_sorted_eq!(expected, &results);
-
+    let formatted = plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
     Ok(())
 }
 
diff --git a/datafusion/core/tests/sqllogictests/test_files/join.slt b/datafusion/core/tests/sqllogictests/test_files/join.slt
index 4366d99a9..e78925a7f 100644
--- a/datafusion/core/tests/sqllogictests/test_files/join.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/join.slt
@@ -16,7 +16,7 @@
 # under the License.
 
 ##########
-## JOIN Tests
+## Join Tests
 ##########
 
 statement ok
@@ -36,9 +36,48 @@ CREATE TABLE grades(grade INT, min INT, max INT) AS VALUES
 (5, 80, 100);
 
 # Regression test: https://github.com/apache/arrow-datafusion/issues/4844
-query I
+query TII
 SELECT s.*, g.grade FROM students s join grades g on s.mark between g.min and g.max WHERE grade > 2 ORDER BY s.mark DESC
 ----
 Amina 89 5
 Salma 77 4
 Christen 50 3
+
+# two tables for join
+statement ok
+CREATE TABLE t1(t1_id INT, t1_name TEXT, t1_int INT) AS VALUES
+(11, 'a', 1),
+(22, 'b', 2),
+(33, 'c', 3),
+(44, 'd', 4);
+
+statement ok
+CREATE TABLE t2(t2_id INT, t2_name TEXT, t2_int INT) AS VALUES
+(11, 'z', 3),
+(22, 'y', 1),
+(44, 'x', 3),
+(55, 'w', 3);
+
+# left semi with wrong where clause
+query error DataFusion error: Schema error: No field named 't2'.'t2_id'. Valid fields are 't1'.'t1_id', 't1'.'t1_name', 't1'.'t1_int'.
+SELECT t1.t1_id,
+       t1.t1_name,
+       t1.t1_int
+FROM   t1 LEFT SEMI
+JOIN   t2
+ON     (
+              t1.t1_id = t2.t2_id)
+WHERE  t2.t2_id > 1
+
+# left semi join with on-filter
+query ITI rowsort
+SELECT t1.t1_id,
+       t1.t1_name,
+       t1.t1_int
+FROM   t1 LEFT SEMI
+JOIN   t2
+ON     (
+              t1.t1_id = t2.t2_id and t2.t2_int > 1)
+----
+11 a 1
+44 d 4
diff --git a/datafusion/core/tests/sqllogictests/test_files/subquery.slt b/datafusion/core/tests/sqllogictests/test_files/subquery.slt
new file mode 100644
index 000000000..70c3c3e13
--- /dev/null
+++ b/datafusion/core/tests/sqllogictests/test_files/subquery.slt
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#   http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#############
+## Subquery Tests
+#############
+
+# two tables for subquery
+statement ok
+CREATE TABLE t1(t1_id INT, t1_name TEXT, t1_int INT) AS VALUES
+(11, 'a', 1),
+(22, 'b', 2),
+(33, 'c', 3),
+(44, 'd', 4);
+
+statement ok
+CREATE TABLE t2(t2_id INT, t2_name TEXT, t2_int INT) AS VALUES
+(11, 'z', 3),
+(22, 'y', 1),
+(44, 'x', 3),
+(55, 'w', 3);
+
+
+# in_subquery_to_join_with_correlated_outer_filter
+query ITI rowsort
+select t1.t1_id,
+       t1.t1_name,
+       t1.t1_int
+from t1
+where t1.t1_id + 12 in (
+                           select t2.t2_id + 1 from t2 where t1.t1_int > 0
+                       )
+----
+11 a 1
+33 c 3
+44 d 4
+
+# not_in_subquery_to_join_with_correlated_outer_filter
+query ITI rowsort
+select t1.t1_id,
+       t1.t1_name,
+       t1.t1_int
+from t1
+where t1.t1_id + 12 not in (
+                               select t2.t2_id + 1 from t2 where t1.t1_int > 0
+                           )
+----
+22 b 2
diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs
index 92a11bd7e..8d994c6c6 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -108,13 +108,9 @@ fn on_lr_is_preserved(plan: &LogicalPlan) -> Result<(bool, bool)> {
             JoinType::Left => Ok((false, true)),
             JoinType::Right => Ok((true, false)),
             JoinType::Full => Ok((false, false)),
-            JoinType::LeftSemi
-            | JoinType::LeftAnti
-            | JoinType::RightSemi
-            | JoinType::RightAnti => {
-                // filter_push_down does not yet support SEMI/ANTI joins with join conditions
-                Ok((false, false))
-            }
+            JoinType::LeftSemi | JoinType::RightSemi => Ok((true, true)),
+            JoinType::LeftAnti => Ok((false, true)),
+            JoinType::RightAnti => Ok((true, false)),
         },
         LogicalPlan::CrossJoin(_) => Err(DataFusionError::Internal(
             "on_lr_is_preserved cannot be applied to CROSSJOIN nodes".to_string(),
@@ -2328,4 +2324,183 @@ mod tests {
             .expect("failed to optimize plan");
         assert_optimized_plan_eq(&optimized_plan, expected)
     }
+
+    #[test]
+    fn left_semi_join_with_filters() -> Result<()> {
+        let left = test_table_scan_with_name("test1")?;
+        let right_table_scan = test_table_scan_with_name("test2")?;
+        let right = LogicalPlanBuilder::from(right_table_scan)
+            .project(vec![col("a"), col("b")])?
+            .build()?;
+        let plan = LogicalPlanBuilder::from(left)
+            .join(
+                right,
+                JoinType::LeftSemi,
+                (
+                    vec![Column::from_qualified_name("test1.a")],
+                    vec![Column::from_qualified_name("test2.a")],
+                ),
+                Some(
+                    col("test1.b")
+                        .gt(lit(1u32))
+                        .and(col("test2.b").gt(lit(2u32))),
+                ),
+            )?
+            .build()?;
+
+        // not part of the test, just good to know:
+        assert_eq!(
+            format!("{plan:?}"),
+            "LeftSemi Join: test1.a = test2.a Filter: test1.b > UInt32(1) AND test2.b > UInt32(2)\
+            \n  TableScan: test1\
+            \n  Projection: test2.a, test2.b\
+            \n    TableScan: test2",
+        );
+
+        // Both side will be pushed down.
+        let expected = "\
+        LeftSemi Join: test1.a = test2.a\
+        \n  Filter: test1.b > UInt32(1)\
+        \n    TableScan: test1\
+        \n  Projection: test2.a, test2.b\
+        \n    Filter: test2.b > UInt32(2)\
+        \n      TableScan: test2";
+        assert_optimized_plan_eq(&plan, expected)
+    }
+
+    #[test]
+    fn right_semi_join_with_filters() -> Result<()> {
+        let left = test_table_scan_with_name("test1")?;
+        let right_table_scan = test_table_scan_with_name("test2")?;
+        let right = LogicalPlanBuilder::from(right_table_scan)
+            .project(vec![col("a"), col("b")])?
+            .build()?;
+        let plan = LogicalPlanBuilder::from(left)
+            .join(
+                right,
+                JoinType::RightSemi,
+                (
+                    vec![Column::from_qualified_name("test1.a")],
+                    vec![Column::from_qualified_name("test2.a")],
+                ),
+                Some(
+                    col("test1.b")
+                        .gt(lit(1u32))
+                        .and(col("test2.b").gt(lit(2u32))),
+                ),
+            )?
+            .build()?;
+
+        // not part of the test, just good to know:
+        assert_eq!(
+            format!("{plan:?}"),
+            "RightSemi Join: test1.a = test2.a Filter: test1.b > UInt32(1) AND test2.b > UInt32(2)\
+            \n  TableScan: test1\
+            \n  Projection: test2.a, test2.b\
+            \n    TableScan: test2",
+        );
+
+        // Both side will be pushed down.
+        let expected = "\
+        RightSemi Join: test1.a = test2.a\
+        \n  Filter: test1.b > UInt32(1)\
+        \n    TableScan: test1\
+        \n  Projection: test2.a, test2.b\
+        \n    Filter: test2.b > UInt32(2)\
+        \n      TableScan: test2";
+        assert_optimized_plan_eq(&plan, expected)
+    }
+
+    #[test]
+    fn left_anti_join_with_filters() -> Result<()> {
+        let table_scan = test_table_scan_with_name("test1")?;
+        let left = LogicalPlanBuilder::from(table_scan)
+            .project(vec![col("a"), col("b")])?
+            .build()?;
+        let right_table_scan = test_table_scan_with_name("test2")?;
+        let right = LogicalPlanBuilder::from(right_table_scan)
+            .project(vec![col("a"), col("b")])?
+            .build()?;
+        let plan = LogicalPlanBuilder::from(left)
+            .join(
+                right,
+                JoinType::LeftAnti,
+                (
+                    vec![Column::from_qualified_name("test1.a")],
+                    vec![Column::from_qualified_name("test2.a")],
+                ),
+                Some(
+                    col("test1.b")
+                        .gt(lit(1u32))
+                        .and(col("test2.b").gt(lit(2u32))),
+                ),
+            )?
+            .build()?;
+
+        // not part of the test, just good to know:
+        assert_eq!(
+            format!("{plan:?}"),
+            "LeftAnti Join: test1.a = test2.a Filter: test1.b > UInt32(1) AND test2.b > UInt32(2)\
+            \n  Projection: test1.a, test1.b\
+            \n    TableScan: test1\
+            \n  Projection: test2.a, test2.b\
+            \n    TableScan: test2",
+        );
+
+        // For left anti, filter of the right side filter can be pushed down.
+        let expected = "\
+        LeftAnti Join: test1.a = test2.a Filter: test1.b > UInt32(1)\
+        \n  Projection: test1.a, test1.b\
+        \n    TableScan: test1\
+        \n  Projection: test2.a, test2.b\
+        \n    Filter: test2.b > UInt32(2)\
+        \n      TableScan: test2";
+        assert_optimized_plan_eq(&plan, expected)
+    }
+
+    #[test]
+    fn right_anti_join_with_filters() -> Result<()> {
+        let table_scan = test_table_scan_with_name("test1")?;
+        let left = LogicalPlanBuilder::from(table_scan)
+            .project(vec![col("a"), col("b")])?
+            .build()?;
+        let right_table_scan = test_table_scan_with_name("test2")?;
+        let right = LogicalPlanBuilder::from(right_table_scan)
+            .project(vec![col("a"), col("b")])?
+            .build()?;
+        let plan = LogicalPlanBuilder::from(left)
+            .join(
+                right,
+                JoinType::RightAnti,
+                (
+                    vec![Column::from_qualified_name("test1.a")],
+                    vec![Column::from_qualified_name("test2.a")],
+                ),
+                Some(
+                    col("test1.b")
+                        .gt(lit(1u32))
+                        .and(col("test2.b").gt(lit(2u32))),
+                ),
+            )?
+            .build()?;
+
+        // not part of the test, just good to know:
+        assert_eq!(
+            format!("{plan:?}"),
+            "RightAnti Join: test1.a = test2.a Filter: test1.b > UInt32(1) AND test2.b > UInt32(2)\
+            \n  Projection: test1.a, test1.b\
+            \n    TableScan: test1\
+            \n  Projection: test2.a, test2.b\
+            \n    TableScan: test2",
+        );
+
+        // For right anti, filter of the left side can be pushed down.
+        let expected = "RightAnti Join: test1.a = test2.a Filter: test2.b > UInt32(2)\
+        \n  Projection: test1.a, test1.b\
+        \n    Filter: test1.b > UInt32(1)\
+        \n      TableScan: test1\
+        \n  Projection: test2.a, test2.b\
+        \n    TableScan: test2";
+        assert_optimized_plan_eq(&plan, expected)
+    }
 }