You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/01/19 14:44:12 UTC
[arrow-datafusion] branch master updated: Support join-filter pushdown for semi/anti join (#4923)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new dde23efed Support join-filter pushdown for semi/anti join (#4923)
dde23efed is described below
commit dde23efed94704044822bcefe49c0af7f9260088
Author: ygf11 <ya...@gmail.com>
AuthorDate: Thu Jan 19 22:44:05 2023 +0800
Support join-filter pushdown for semi/anti join (#4923)
* Support filter pushdown for semi/anti join
* improve comment
* add subquery.slt for sqllogictests
* add tests
* improve test
---
datafusion/core/tests/dataframe.rs | 89 ++++++++++
datafusion/core/tests/sql/joins.rs | 71 ++++++--
.../core/tests/sqllogictests/test_files/join.slt | 43 ++++-
.../tests/sqllogictests/test_files/subquery.slt | 62 +++++++
datafusion/optimizer/src/push_down_filter.rs | 189 ++++++++++++++++++++-
5 files changed, 431 insertions(+), 23 deletions(-)
diff --git a/datafusion/core/tests/dataframe.rs b/datafusion/core/tests/dataframe.rs
index e4d9d1c4e..39b7739af 100644
--- a/datafusion/core/tests/dataframe.rs
+++ b/datafusion/core/tests/dataframe.rs
@@ -411,6 +411,95 @@ async fn join_with_alias_filter() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn right_semi_with_alias_filter() -> Result<()> {
+ let join_ctx = create_join_context()?;
+ let t1 = join_ctx.table("t1").await?;
+ let t2 = join_ctx.table("t2").await?;
+
+ // t1.a = t2.a and t1.c > 1 and t2.c > 1
+ let filter = col("t1.a")
+ .eq(col("t2.a"))
+ .and(col("t1.c").gt(lit(1u32)))
+ .and(col("t2.c").gt(lit(1u32)));
+
+ let df = t1
+ .join(t2, JoinType::RightSemi, &[], &[], Some(filter))?
+ .select(vec![col("t2.a"), col("t2.b"), col("t2.c")])?;
+ let optimized_plan = df.clone().into_optimized_plan()?;
+ let expected = vec![
+ "Projection: t2.a, t2.b, t2.c [a:UInt32, b:Utf8, c:Int32]",
+ " RightSemi Join: t1.a = t2.a [a:UInt32, b:Utf8, c:Int32]",
+ " Filter: t1.c > Int32(1) [a:UInt32, c:Int32]",
+ " TableScan: t1 projection=[a, c] [a:UInt32, c:Int32]",
+ " Filter: t2.c > Int32(1) [a:UInt32, b:Utf8, c:Int32]",
+ " TableScan: t2 projection=[a, b, c] [a:UInt32, b:Utf8, c:Int32]",
+ ];
+
+ let formatted = optimized_plan.display_indent_schema().to_string();
+ let actual: Vec<&str> = formatted.trim().lines().collect();
+ assert_eq!(
+ expected, actual,
+ "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+ );
+
+ let results = df.collect().await?;
+ let expected: Vec<&str> = vec![
+ "+-----+---+---+",
+ "| a | b | c |",
+ "+-----+---+---+",
+ "| 10 | b | 2 |",
+ "| 100 | d | 4 |",
+ "+-----+---+---+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+ Ok(())
+}
+
+#[tokio::test]
+async fn right_anti_filter_push_down() -> Result<()> {
+ let join_ctx = create_join_context()?;
+ let t1 = join_ctx.table("t1").await?;
+ let t2 = join_ctx.table("t2").await?;
+
+ // t1.a = t2.a and t1.c > 1 and t2.c > 1
+ let filter = col("t1.a")
+ .eq(col("t2.a"))
+ .and(col("t1.c").gt(lit(1u32)))
+ .and(col("t2.c").gt(lit(1u32)));
+
+ let df = t1
+ .join(t2, JoinType::RightAnti, &[], &[], Some(filter))?
+ .select(vec![col("t2.a"), col("t2.b"), col("t2.c")])?;
+ let optimized_plan = df.clone().into_optimized_plan()?;
+ let expected = vec![
+ "Projection: t2.a, t2.b, t2.c [a:UInt32, b:Utf8, c:Int32]",
+ " RightAnti Join: t1.a = t2.a Filter: t2.c > Int32(1) [a:UInt32, b:Utf8, c:Int32]",
+ " Filter: t1.c > Int32(1) [a:UInt32, c:Int32]",
+ " TableScan: t1 projection=[a, c] [a:UInt32, c:Int32]",
+ " TableScan: t2 projection=[a, b, c] [a:UInt32, b:Utf8, c:Int32]",
+ ];
+
+ let formatted = optimized_plan.display_indent_schema().to_string();
+ let actual: Vec<&str> = formatted.trim().lines().collect();
+ assert_eq!(
+ expected, actual,
+ "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+ );
+
+ let results = df.collect().await?;
+ let expected: Vec<&str> = vec![
+ "+----+---+---+",
+ "| a | b | c |",
+ "+----+---+---+",
+ "| 13 | c | 3 |",
+ "| 3 | a | 1 |",
+ "+----+---+---+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+ Ok(())
+}
+
async fn create_test_table() -> Result<DataFrame> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs
index ac02a0d57..e0bd1a523 100644
--- a/datafusion/core/tests/sql/joins.rs
+++ b/datafusion/core/tests/sql/joins.rs
@@ -2112,6 +2112,34 @@ async fn left_semi_join() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn left_semi_join_pushdown() -> Result<()> {
+ let ctx = create_left_semi_anti_join_context_with_null_ids("t1_id", "t2_id", false)
+ .unwrap();
+
+ // assert logical plan
+ let sql = "SELECT t1.t1_id, t1.t1_name FROM t1 LEFT SEMI JOIN t2 ON (t1.t1_id = t2.t2_id and t2.t2_int > 1)";
+ let msg = format!("Creating logical plan for '{sql}'");
+ let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
+ let plan = dataframe.into_optimized_plan()?;
+ let expected = vec![
+ "Explain [plan_type:Utf8, plan:Utf8]",
+ " Projection: t1.t1_id, t1.t1_name [t1_id:UInt32;N, t1_name:Utf8;N]",
+ " LeftSemi Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N]",
+ " TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
+ " Filter: t2.t2_int > UInt32(1) [t2_id:UInt32;N, t2_int:UInt32;N]",
+ " TableScan: t2 projection=[t2_id, t2_int] [t2_id:UInt32;N, t2_int:UInt32;N]",
+ ];
+ let formatted = plan.display_indent_schema().to_string();
+ let actual: Vec<&str> = formatted.trim().lines().collect();
+ assert_eq!(
+ expected, actual,
+ "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+ );
+
+ Ok(())
+}
+
#[tokio::test]
async fn left_anti_join() -> Result<()> {
let test_repartition_joins = vec![true, false];
@@ -3126,13 +3154,12 @@ async fn in_subquery_to_join_with_correlated_outer_filter() -> Result<()> {
let msg = format!("Creating logical plan for '{sql}'");
let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
let plan = dataframe.into_optimized_plan().unwrap();
-
- // The `t1.t1_int > UInt32(0)` should be pushdown by `filter push down rule`.
let expected = vec![
"Explain [plan_type:Utf8, plan:Utf8]",
" Projection: t1.t1_id, t1.t1_name, t1.t1_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
- " LeftSemi Join: CAST(t1.t1_id AS Int64) + Int64(12) = __correlated_sq_1.CAST(t2_id AS Int64) + Int64(1) Filter: t1.t1_int > UInt32(0) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
- " TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+ " LeftSemi Join: CAST(t1.t1_id AS Int64) + Int64(12) = __correlated_sq_1.CAST(t2_id AS Int64) + Int64(1) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+ " Filter: t1.t1_int > UInt32(0) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+ " TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
" SubqueryAlias: __correlated_sq_1 [CAST(t2_id AS Int64) + Int64(1):Int64;N]",
" Projection: CAST(t2.t2_id AS Int64) + Int64(1) AS CAST(t2_id AS Int64) + Int64(1) [CAST(t2_id AS Int64) + Int64(1):Int64;N]",
" TableScan: t2 projection=[t2_id] [t2_id:UInt32;N]",
@@ -3144,20 +3171,36 @@ async fn in_subquery_to_join_with_correlated_outer_filter() -> Result<()> {
expected, actual,
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);
+ Ok(())
+}
+
+#[tokio::test]
+async fn not_in_subquery_to_join_with_correlated_outer_filter() -> Result<()> {
+ let ctx = create_join_context("t1_id", "t2_id", false)?;
+
+ let sql = "select t1.t1_id, t1.t1_name, t1.t1_int from t1 where t1.t1_id + 12 not in
+ (select t2.t2_id + 1 from t2 where t1.t1_int > 0)";
+ // assert logical plan
+ let msg = format!("Creating logical plan for '{sql}'");
+ let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
+ let plan = dataframe.into_optimized_plan().unwrap();
let expected = vec![
- "+-------+---------+--------+",
- "| t1_id | t1_name | t1_int |",
- "+-------+---------+--------+",
- "| 11 | a | 1 |",
- "| 33 | c | 3 |",
- "| 44 | d | 4 |",
- "+-------+---------+--------+",
+ "Explain [plan_type:Utf8, plan:Utf8]",
+ " Projection: t1.t1_id, t1.t1_name, t1.t1_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+ " LeftAnti Join: CAST(t1.t1_id AS Int64) + Int64(12) = __correlated_sq_1.CAST(t2_id AS Int64) + Int64(1) Filter: t1.t1_int > UInt32(0) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+ " TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+ " SubqueryAlias: __correlated_sq_1 [CAST(t2_id AS Int64) + Int64(1):Int64;N]",
+ " Projection: CAST(t2.t2_id AS Int64) + Int64(1) AS CAST(t2_id AS Int64) + Int64(1) [CAST(t2_id AS Int64) + Int64(1):Int64;N]",
+ " TableScan: t2 projection=[t2_id] [t2_id:UInt32;N]",
];
- let results = execute_to_batches(&ctx, sql).await;
- assert_batches_sorted_eq!(expected, &results);
-
+ let formatted = plan.display_indent_schema().to_string();
+ let actual: Vec<&str> = formatted.trim().lines().collect();
+ assert_eq!(
+ expected, actual,
+ "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+ );
Ok(())
}
diff --git a/datafusion/core/tests/sqllogictests/test_files/join.slt b/datafusion/core/tests/sqllogictests/test_files/join.slt
index 4366d99a9..e78925a7f 100644
--- a/datafusion/core/tests/sqllogictests/test_files/join.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/join.slt
@@ -16,7 +16,7 @@
# under the License.
##########
-## JOIN Tests
+## Join Tests
##########
statement ok
@@ -36,9 +36,48 @@ CREATE TABLE grades(grade INT, min INT, max INT) AS VALUES
(5, 80, 100);
# Regression test: https://github.com/apache/arrow-datafusion/issues/4844
-query I
+query TII
SELECT s.*, g.grade FROM students s join grades g on s.mark between g.min and g.max WHERE grade > 2 ORDER BY s.mark DESC
----
Amina 89 5
Salma 77 4
Christen 50 3
+
+# two tables for join
+statement ok
+CREATE TABLE t1(t1_id INT, t1_name TEXT, t1_int INT) AS VALUES
+(11, 'a', 1),
+(22, 'b', 2),
+(33, 'c', 3),
+(44, 'd', 4);
+
+statement ok
+CREATE TABLE t2(t2_id INT, t2_name TEXT, t2_int INT) AS VALUES
+(11, 'z', 3),
+(22, 'y', 1),
+(44, 'x', 3),
+(55, 'w', 3);
+
+# left semi with wrong where clause
+query error DataFusion error: Schema error: No field named 't2'.'t2_id'. Valid fields are 't1'.'t1_id', 't1'.'t1_name', 't1'.'t1_int'.
+SELECT t1.t1_id,
+ t1.t1_name,
+ t1.t1_int
+FROM t1 LEFT SEMI
+JOIN t2
+ON (
+ t1.t1_id = t2.t2_id)
+WHERE t2.t2_id > 1
+
+# left semi join with on-filter
+query ITI rowsort
+SELECT t1.t1_id,
+ t1.t1_name,
+ t1.t1_int
+FROM t1 LEFT SEMI
+JOIN t2
+ON (
+ t1.t1_id = t2.t2_id and t2.t2_int > 1)
+----
+11 a 1
+44 d 4
diff --git a/datafusion/core/tests/sqllogictests/test_files/subquery.slt b/datafusion/core/tests/sqllogictests/test_files/subquery.slt
new file mode 100644
index 000000000..70c3c3e13
--- /dev/null
+++ b/datafusion/core/tests/sqllogictests/test_files/subquery.slt
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#############
+## Subquery Tests
+#############
+
+# two tables for subquery
+statement ok
+CREATE TABLE t1(t1_id INT, t1_name TEXT, t1_int INT) AS VALUES
+(11, 'a', 1),
+(22, 'b', 2),
+(33, 'c', 3),
+(44, 'd', 4);
+
+statement ok
+CREATE TABLE t2(t2_id INT, t2_name TEXT, t2_int INT) AS VALUES
+(11, 'z', 3),
+(22, 'y', 1),
+(44, 'x', 3),
+(55, 'w', 3);
+
+
+# in_subquery_to_join_with_correlated_outer_filter
+query ITI rowsort
+select t1.t1_id,
+ t1.t1_name,
+ t1.t1_int
+from t1
+where t1.t1_id + 12 in (
+ select t2.t2_id + 1 from t2 where t1.t1_int > 0
+ )
+----
+11 a 1
+33 c 3
+44 d 4
+
+# not_in_subquery_to_join_with_correlated_outer_filter
+query ITI rowsort
+select t1.t1_id,
+ t1.t1_name,
+ t1.t1_int
+from t1
+where t1.t1_id + 12 not in (
+ select t2.t2_id + 1 from t2 where t1.t1_int > 0
+ )
+----
+22 b 2
diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs
index 92a11bd7e..8d994c6c6 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -108,13 +108,9 @@ fn on_lr_is_preserved(plan: &LogicalPlan) -> Result<(bool, bool)> {
JoinType::Left => Ok((false, true)),
JoinType::Right => Ok((true, false)),
JoinType::Full => Ok((false, false)),
- JoinType::LeftSemi
- | JoinType::LeftAnti
- | JoinType::RightSemi
- | JoinType::RightAnti => {
- // filter_push_down does not yet support SEMI/ANTI joins with join conditions
- Ok((false, false))
- }
+ JoinType::LeftSemi | JoinType::RightSemi => Ok((true, true)),
+ JoinType::LeftAnti => Ok((false, true)),
+ JoinType::RightAnti => Ok((true, false)),
},
LogicalPlan::CrossJoin(_) => Err(DataFusionError::Internal(
"on_lr_is_preserved cannot be applied to CROSSJOIN nodes".to_string(),
@@ -2328,4 +2324,183 @@ mod tests {
.expect("failed to optimize plan");
assert_optimized_plan_eq(&optimized_plan, expected)
}
+
+ #[test]
+ fn left_semi_join_with_filters() -> Result<()> {
+ let left = test_table_scan_with_name("test1")?;
+ let right_table_scan = test_table_scan_with_name("test2")?;
+ let right = LogicalPlanBuilder::from(right_table_scan)
+ .project(vec![col("a"), col("b")])?
+ .build()?;
+ let plan = LogicalPlanBuilder::from(left)
+ .join(
+ right,
+ JoinType::LeftSemi,
+ (
+ vec![Column::from_qualified_name("test1.a")],
+ vec![Column::from_qualified_name("test2.a")],
+ ),
+ Some(
+ col("test1.b")
+ .gt(lit(1u32))
+ .and(col("test2.b").gt(lit(2u32))),
+ ),
+ )?
+ .build()?;
+
+ // not part of the test, just good to know:
+ assert_eq!(
+ format!("{plan:?}"),
+ "LeftSemi Join: test1.a = test2.a Filter: test1.b > UInt32(1) AND test2.b > UInt32(2)\
+ \n TableScan: test1\
+ \n Projection: test2.a, test2.b\
+ \n TableScan: test2",
+ );
+
+ // Both side will be pushed down.
+ let expected = "\
+ LeftSemi Join: test1.a = test2.a\
+ \n Filter: test1.b > UInt32(1)\
+ \n TableScan: test1\
+ \n Projection: test2.a, test2.b\
+ \n Filter: test2.b > UInt32(2)\
+ \n TableScan: test2";
+ assert_optimized_plan_eq(&plan, expected)
+ }
+
+ #[test]
+ fn right_semi_join_with_filters() -> Result<()> {
+ let left = test_table_scan_with_name("test1")?;
+ let right_table_scan = test_table_scan_with_name("test2")?;
+ let right = LogicalPlanBuilder::from(right_table_scan)
+ .project(vec![col("a"), col("b")])?
+ .build()?;
+ let plan = LogicalPlanBuilder::from(left)
+ .join(
+ right,
+ JoinType::RightSemi,
+ (
+ vec![Column::from_qualified_name("test1.a")],
+ vec![Column::from_qualified_name("test2.a")],
+ ),
+ Some(
+ col("test1.b")
+ .gt(lit(1u32))
+ .and(col("test2.b").gt(lit(2u32))),
+ ),
+ )?
+ .build()?;
+
+ // not part of the test, just good to know:
+ assert_eq!(
+ format!("{plan:?}"),
+ "RightSemi Join: test1.a = test2.a Filter: test1.b > UInt32(1) AND test2.b > UInt32(2)\
+ \n TableScan: test1\
+ \n Projection: test2.a, test2.b\
+ \n TableScan: test2",
+ );
+
+ // Both side will be pushed down.
+ let expected = "\
+ RightSemi Join: test1.a = test2.a\
+ \n Filter: test1.b > UInt32(1)\
+ \n TableScan: test1\
+ \n Projection: test2.a, test2.b\
+ \n Filter: test2.b > UInt32(2)\
+ \n TableScan: test2";
+ assert_optimized_plan_eq(&plan, expected)
+ }
+
+ #[test]
+ fn left_anti_join_with_filters() -> Result<()> {
+ let table_scan = test_table_scan_with_name("test1")?;
+ let left = LogicalPlanBuilder::from(table_scan)
+ .project(vec![col("a"), col("b")])?
+ .build()?;
+ let right_table_scan = test_table_scan_with_name("test2")?;
+ let right = LogicalPlanBuilder::from(right_table_scan)
+ .project(vec![col("a"), col("b")])?
+ .build()?;
+ let plan = LogicalPlanBuilder::from(left)
+ .join(
+ right,
+ JoinType::LeftAnti,
+ (
+ vec![Column::from_qualified_name("test1.a")],
+ vec![Column::from_qualified_name("test2.a")],
+ ),
+ Some(
+ col("test1.b")
+ .gt(lit(1u32))
+ .and(col("test2.b").gt(lit(2u32))),
+ ),
+ )?
+ .build()?;
+
+ // not part of the test, just good to know:
+ assert_eq!(
+ format!("{plan:?}"),
+ "LeftAnti Join: test1.a = test2.a Filter: test1.b > UInt32(1) AND test2.b > UInt32(2)\
+ \n Projection: test1.a, test1.b\
+ \n TableScan: test1\
+ \n Projection: test2.a, test2.b\
+ \n TableScan: test2",
+ );
+
+ // For left anti, filter of the right side filter can be pushed down.
+ let expected = "\
+ LeftAnti Join: test1.a = test2.a Filter: test1.b > UInt32(1)\
+ \n Projection: test1.a, test1.b\
+ \n TableScan: test1\
+ \n Projection: test2.a, test2.b\
+ \n Filter: test2.b > UInt32(2)\
+ \n TableScan: test2";
+ assert_optimized_plan_eq(&plan, expected)
+ }
+
+ #[test]
+ fn right_anti_join_with_filters() -> Result<()> {
+ let table_scan = test_table_scan_with_name("test1")?;
+ let left = LogicalPlanBuilder::from(table_scan)
+ .project(vec![col("a"), col("b")])?
+ .build()?;
+ let right_table_scan = test_table_scan_with_name("test2")?;
+ let right = LogicalPlanBuilder::from(right_table_scan)
+ .project(vec![col("a"), col("b")])?
+ .build()?;
+ let plan = LogicalPlanBuilder::from(left)
+ .join(
+ right,
+ JoinType::RightAnti,
+ (
+ vec![Column::from_qualified_name("test1.a")],
+ vec![Column::from_qualified_name("test2.a")],
+ ),
+ Some(
+ col("test1.b")
+ .gt(lit(1u32))
+ .and(col("test2.b").gt(lit(2u32))),
+ ),
+ )?
+ .build()?;
+
+ // not part of the test, just good to know:
+ assert_eq!(
+ format!("{plan:?}"),
+ "RightAnti Join: test1.a = test2.a Filter: test1.b > UInt32(1) AND test2.b > UInt32(2)\
+ \n Projection: test1.a, test1.b\
+ \n TableScan: test1\
+ \n Projection: test2.a, test2.b\
+ \n TableScan: test2",
+ );
+
+ // For right anti, filter of the left side can be pushed down.
+ let expected = "RightAnti Join: test1.a = test2.a Filter: test2.b > UInt32(2)\
+ \n Projection: test1.a, test1.b\
+ \n Filter: test1.b > UInt32(1)\
+ \n TableScan: test1\
+ \n Projection: test2.a, test2.b\
+ \n TableScan: test2";
+ assert_optimized_plan_eq(&plan, expected)
+ }
}