You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/12/28 21:03:03 UTC

[arrow-datafusion] branch master updated: remove `SubqueryFilterToJoin` (#4731)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new fc8fb0631 remove `SubqueryFilterToJoin` (#4731)
fc8fb0631 is described below

commit fc8fb0631bf234c3575834adaea0e4a25c996004
Author: jakevin <ja...@gmail.com>
AuthorDate: Thu Dec 29 05:02:57 2022 +0800

    remove `SubqueryFilterToJoin` (#4731)
    
    * remove `SubqueryFilterToJoin`
    
    * correct UT
---
 datafusion/optimizer/src/decorrelate_where_in.rs   | 188 ++++++++-
 datafusion/optimizer/src/lib.rs                    |   1 -
 datafusion/optimizer/src/optimizer.rs              |   2 -
 .../optimizer/src/subquery_filter_to_join.rs       | 421 ---------------------
 4 files changed, 187 insertions(+), 425 deletions(-)

diff --git a/datafusion/optimizer/src/decorrelate_where_in.rs b/datafusion/optimizer/src/decorrelate_where_in.rs
index bcc120723..c2a80ac2b 100644
--- a/datafusion/optimizer/src/decorrelate_where_in.rs
+++ b/datafusion/optimizer/src/decorrelate_where_in.rs
@@ -220,10 +220,196 @@ mod tests {
     use crate::test::*;
     use datafusion_common::Result;
     use datafusion_expr::{
-        col, in_subquery, lit, logical_plan::LogicalPlanBuilder, not_in_subquery,
+        and, binary_expr, col, in_subquery, lit, logical_plan::LogicalPlanBuilder,
+        not_in_subquery, or, Operator,
     };
     use std::ops::Add;
 
+    fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> {
+        assert_optimized_plan_eq_display_indent(
+            Arc::new(DecorrelateWhereIn::new()),
+            plan,
+            expected,
+        );
+        Ok(())
+    }
+
+    fn test_subquery_with_name(name: &str) -> Result<Arc<LogicalPlan>> {
+        let table_scan = test_table_scan_with_name(name)?;
+        Ok(Arc::new(
+            LogicalPlanBuilder::from(table_scan)
+                .project(vec![col("c")])?
+                .build()?,
+        ))
+    }
+
+    /// Test for several IN subquery expressions
+    #[test]
+    fn in_subquery_multiple() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .filter(and(
+                in_subquery(col("c"), test_subquery_with_name("sq_1")?),
+                in_subquery(col("b"), test_subquery_with_name("sq_2")?),
+            ))?
+            .project(vec![col("test.b")])?
+            .build()?;
+
+        let expected = "Projection: test.b [b:UInt32]\
+        \n  LeftSemi Join: test.b = __sq_2.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n    LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
+        \n      SubqueryAlias: __sq_1 [c:UInt32]\
+        \n        Projection: sq_1.c AS c [c:UInt32]\
+        \n          TableScan: sq_1 [a:UInt32, b:UInt32, c:UInt32]\
+        \n    SubqueryAlias: __sq_2 [c:UInt32]\
+        \n      Projection: sq_2.c AS c [c:UInt32]\
+        \n        TableScan: sq_2 [a:UInt32, b:UInt32, c:UInt32]";
+
+        assert_optimized_plan_equal(&plan, expected)
+    }
+
+    /// Test for IN subquery with additional AND filter
+    #[test]
+    fn in_subquery_with_and_filters() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .filter(and(
+                in_subquery(col("c"), test_subquery_with_name("sq")?),
+                and(
+                    binary_expr(col("a"), Operator::Eq, lit(1_u32)),
+                    binary_expr(col("b"), Operator::Lt, lit(30_u32)),
+                ),
+            ))?
+            .project(vec![col("test.b")])?
+            .build()?;
+
+        let expected = "Projection: test.b [b:UInt32]\
+        \n  Filter: test.a = UInt32(1) AND test.b < UInt32(30) [a:UInt32, b:UInt32, c:UInt32]\
+        \n    LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
+        \n      SubqueryAlias: __sq_1 [c:UInt32]\
+        \n        Projection: sq.c AS c [c:UInt32]\
+        \n          TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
+
+        assert_optimized_plan_equal(&plan, expected)
+    }
+
+    /// Test for IN subquery with additional OR filter
+    /// filter expression not modified
+    #[test]
+    fn in_subquery_with_or_filters() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .filter(or(
+                and(
+                    binary_expr(col("a"), Operator::Eq, lit(1_u32)),
+                    binary_expr(col("b"), Operator::Lt, lit(30_u32)),
+                ),
+                in_subquery(col("c"), test_subquery_with_name("sq")?),
+            ))?
+            .project(vec![col("test.b")])?
+            .build()?;
+
+        let expected = "Projection: test.b [b:UInt32]\
+        \n  Filter: test.a = UInt32(1) AND test.b < UInt32(30) OR test.c IN (<subquery>) [a:UInt32, b:UInt32, c:UInt32]\
+        \n    Subquery: [c:UInt32]\
+        \n      Projection: sq.c [c:UInt32]\
+        \n        TableScan: sq [a:UInt32, b:UInt32, c:UInt32]\
+        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
+
+        assert_optimized_plan_equal(&plan, expected)
+    }
+
+    #[test]
+    fn in_subquery_with_and_or_filters() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .filter(and(
+                or(
+                    binary_expr(col("a"), Operator::Eq, lit(1_u32)),
+                    in_subquery(col("b"), test_subquery_with_name("sq1")?),
+                ),
+                in_subquery(col("c"), test_subquery_with_name("sq2")?),
+            ))?
+            .project(vec![col("test.b")])?
+            .build()?;
+
+        let expected = "Projection: test.b [b:UInt32]\
+        \n  Filter: test.a = UInt32(1) OR test.b IN (<subquery>) [a:UInt32, b:UInt32, c:UInt32]\
+        \n    Subquery: [c:UInt32]\
+        \n      Projection: sq1.c [c:UInt32]\
+        \n        TableScan: sq1 [a:UInt32, b:UInt32, c:UInt32]\
+        \n    LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
+        \n      SubqueryAlias: __sq_1 [c:UInt32]\
+        \n        Projection: sq2.c AS c [c:UInt32]\
+        \n          TableScan: sq2 [a:UInt32, b:UInt32, c:UInt32]";
+
+        assert_optimized_plan_equal(&plan, expected)
+    }
+
+    /// Test for nested IN subqueries
+    #[test]
+    fn in_subquery_nested() -> Result<()> {
+        let table_scan = test_table_scan()?;
+
+        let subquery = LogicalPlanBuilder::from(test_table_scan_with_name("sq")?)
+            .filter(in_subquery(col("a"), test_subquery_with_name("sq_nested")?))?
+            .project(vec![col("a")])?
+            .build()?;
+
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .filter(in_subquery(col("b"), Arc::new(subquery)))?
+            .project(vec![col("test.b")])?
+            .build()?;
+
+        let expected = "Projection: test.b [b:UInt32]\
+        \n  LeftSemi Join: test.b = __sq_1.a [a:UInt32, b:UInt32, c:UInt32]\
+        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
+        \n    SubqueryAlias: __sq_1 [a:UInt32]\
+        \n      Projection: sq.a AS a [a:UInt32]\
+        \n        LeftSemi Join: sq.a = __sq_2.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n          TableScan: sq [a:UInt32, b:UInt32, c:UInt32]\
+        \n          SubqueryAlias: __sq_2 [c:UInt32]\
+        \n            Projection: sq_nested.c AS c [c:UInt32]\
+        \n              TableScan: sq_nested [a:UInt32, b:UInt32, c:UInt32]";
+
+        assert_optimized_plan_equal(&plan, expected)
+    }
+
+    /// Test for filter input modification in case filter not supported
+    /// Outer filter expression not modified while inner converted to join
+    #[test]
+    fn in_subquery_input_modified() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .filter(in_subquery(col("c"), test_subquery_with_name("sq_inner")?))?
+            .project(vec![col("b"), col("c")])?
+            .alias("wrapped")?
+            .filter(or(
+                binary_expr(col("b"), Operator::Lt, lit(30_u32)),
+                in_subquery(col("c"), test_subquery_with_name("sq_outer")?),
+            ))?
+            .project(vec![col("b")])?
+            .build()?;
+
+        let expected =  "Projection: wrapped.b [b:UInt32]\
+        \n  Filter: wrapped.b < UInt32(30) OR wrapped.c IN (<subquery>) [b:UInt32, c:UInt32]\
+        \n    Subquery: [c:UInt32]\
+        \n      Projection: sq_outer.c [c:UInt32]\
+        \n        TableScan: sq_outer [a:UInt32, b:UInt32, c:UInt32]\
+        \n    SubqueryAlias: wrapped [b:UInt32, c:UInt32]\
+        \n      Projection: test.b, test.c [b:UInt32, c:UInt32]\
+        \n        LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n          TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
+        \n          SubqueryAlias: __sq_1 [c:UInt32]\
+        \n            Projection: sq_inner.c AS c [c:UInt32]\
+        \n              TableScan: sq_inner [a:UInt32, b:UInt32, c:UInt32]";
+
+        assert_optimized_plan_equal(&plan, expected)
+    }
+
     #[cfg(test)]
     #[ctor::ctor]
     fn init() {
diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs
index b03725fe6..27e6dff08 100644
--- a/datafusion/optimizer/src/lib.rs
+++ b/datafusion/optimizer/src/lib.rs
@@ -33,7 +33,6 @@ pub mod push_down_projection;
 pub mod scalar_subquery_to_join;
 pub mod simplify_expressions;
 pub mod single_distinct_to_groupby;
-pub mod subquery_filter_to_join;
 pub mod type_coercion;
 pub mod utils;
 
diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs
index 36968f2f1..b27c49d2f 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -35,7 +35,6 @@ use crate::rewrite_disjunctive_predicate::RewriteDisjunctivePredicate;
 use crate::scalar_subquery_to_join::ScalarSubqueryToJoin;
 use crate::simplify_expressions::SimplifyExpressions;
 use crate::single_distinct_to_groupby::SingleDistinctToGroupBy;
-use crate::subquery_filter_to_join::SubqueryFilterToJoin;
 use crate::type_coercion::TypeCoercion;
 use crate::unwrap_cast_in_comparison::UnwrapCastInComparison;
 use chrono::{DateTime, Utc};
@@ -244,7 +243,6 @@ impl Optimizer {
             Arc::new(DecorrelateWhereExists::new()),
             Arc::new(DecorrelateWhereIn::new()),
             Arc::new(ScalarSubqueryToJoin::new()),
-            Arc::new(SubqueryFilterToJoin::new()),
             // simplify expressions does not simplify expressions in subqueries, so we
             // run it again after running the optimizations that potentially converted
             // subqueries to joins
diff --git a/datafusion/optimizer/src/subquery_filter_to_join.rs b/datafusion/optimizer/src/subquery_filter_to_join.rs
deleted file mode 100644
index 696f911a1..000000000
--- a/datafusion/optimizer/src/subquery_filter_to_join.rs
+++ /dev/null
@@ -1,421 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Optimizer rule for rewriting subquery filters to joins
-//!
-//! It handles standalone parts of logical conjunction expressions, i.e.
-//! ```text
-//!   WHERE t1.f IN (SELECT f FROM t2) AND t2.f = 'x'
-//! ```
-//! will be rewritten, but
-//! ```text
-//!   WHERE t1.f IN (SELECT f FROM t2) OR t2.f = 'x'
-//! ```
-//! won't
-use crate::optimizer::ApplyOrder;
-use crate::{utils, OptimizerConfig, OptimizerRule};
-use datafusion_common::{DataFusionError, Result};
-use datafusion_expr::{
-    expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion},
-    logical_plan::{
-        builder::build_join_schema, Filter, Join, JoinConstraint, JoinType, LogicalPlan,
-    },
-    Expr,
-};
-use std::sync::Arc;
-
-/// Optimizer rule for rewriting subquery filters to joins
-#[derive(Default)]
-pub struct SubqueryFilterToJoin {}
-
-impl SubqueryFilterToJoin {
-    #[allow(missing_docs)]
-    pub fn new() -> Self {
-        Self {}
-    }
-}
-
-impl OptimizerRule for SubqueryFilterToJoin {
-    fn try_optimize(
-        &self,
-        plan: &LogicalPlan,
-        _config: &dyn OptimizerConfig,
-    ) -> Result<Option<LogicalPlan>> {
-        match plan {
-            LogicalPlan::Filter(filter) => {
-                // Apply optimizer rule to current input
-                let input = filter.input.as_ref().clone();
-
-                // Splitting filter expression into components by AND
-                let filters = utils::split_conjunction(&filter.predicate);
-
-                // Searching for subquery-based filters
-                let (subquery_filters, regular_filters): (Vec<&Expr>, Vec<&Expr>) =
-                    filters
-                        .into_iter()
-                        .partition(|&e| matches!(e, Expr::InSubquery { .. }));
-
-                // Check all subquery filters could be rewritten
-                //
-                // In case of expressions which could not be rewritten
-                // return original filter with optimized input
-                let mut subqueries_in_regular = vec![];
-                regular_filters.iter().try_for_each(|&e| {
-                    extract_subquery_filters(e, &mut subqueries_in_regular)
-                })?;
-
-                if !subqueries_in_regular.is_empty() {
-                    return Ok(Some(LogicalPlan::Filter(Filter::try_new(
-                        filter.predicate.clone(),
-                        Arc::new(input),
-                    )?)));
-                };
-
-                // Add subquery joins to new_input
-                // optimized_input value should retain for possible optimization rollback
-                let opt_result = subquery_filters.iter().try_fold(
-                    input.clone(),
-                    |input, &e| match e {
-                        Expr::InSubquery {
-                            expr,
-                            subquery,
-                            negated,
-                        } => {
-                            let right_input = self.try_optimize(
-                                &subquery.subquery,
-                                _config
-                            )?.unwrap_or_else(||subquery.subquery.as_ref().clone());
-                            let right_schema = right_input.schema();
-                            if right_schema.fields().len() != 1 {
-                                return Err(DataFusionError::Plan(
-                                    "Only single column allowed in InSubquery"
-                                        .to_string(),
-                                ));
-                            };
-
-                            let right_key = right_schema.field(0).qualified_column();
-                            let left_key = match *expr.clone() {
-                                Expr::Column(col) => col,
-                                _ => return Err(DataFusionError::NotImplemented(
-                                    "Filtering by expression not implemented for InSubquery"
-                                        .to_string(),
-                                )),
-                            };
-
-                            let join_type = if *negated {
-                                JoinType::LeftAnti
-                            } else {
-                                JoinType::LeftSemi
-                            };
-
-                            let schema = build_join_schema(
-                                input.schema(),
-                                right_schema,
-                                &join_type,
-                            )?;
-
-                            Ok(LogicalPlan::Join(Join {
-                                left: Arc::new(input),
-                                right: Arc::new(right_input),
-                                on: vec![(Expr::Column(left_key), Expr::Column(right_key))],
-                                filter: None,
-                                join_type,
-                                join_constraint: JoinConstraint::On,
-                                schema: Arc::new(schema),
-                                null_equals_null: false,
-                            }))
-                        }
-                        _ => Err(DataFusionError::Plan(
-                            "Unknown expression while rewriting subquery to joins"
-                                .to_string(),
-                        )),
-                    }
-                );
-
-                // In case of expressions which could not be rewritten
-                // return original filter with optimized input
-                let new_input = match opt_result {
-                    Ok(plan) => plan,
-                    Err(_) => {
-                        return Ok(Some(LogicalPlan::Filter(Filter::try_new(
-                            filter.predicate.clone(),
-                            Arc::new(input),
-                        )?)))
-                    }
-                };
-
-                // Apply regular filters to join output if some or just return join
-                if regular_filters.is_empty() {
-                    Ok(Some(new_input))
-                } else {
-                    Ok(Some(utils::add_filter(new_input, &regular_filters)?))
-                }
-            }
-            _ => Ok(None),
-        }
-    }
-
-    fn name(&self) -> &str {
-        "subquery_filter_to_join"
-    }
-
-    fn apply_order(&self) -> Option<ApplyOrder> {
-        Some(ApplyOrder::TopDown)
-    }
-}
-
-fn extract_subquery_filters(expression: &Expr, extracted: &mut Vec<Expr>) -> Result<()> {
-    struct InSubqueryVisitor<'a> {
-        accum: &'a mut Vec<Expr>,
-    }
-
-    impl ExpressionVisitor for InSubqueryVisitor<'_> {
-        fn pre_visit(self, expr: &Expr) -> Result<Recursion<Self>> {
-            if let Expr::InSubquery { .. } = expr {
-                self.accum.push(expr.to_owned());
-            }
-            Ok(Recursion::Continue(self))
-        }
-    }
-
-    expression.accept(InSubqueryVisitor { accum: extracted })?;
-    Ok(())
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::test::*;
-    use datafusion_expr::{
-        and, binary_expr, col, in_subquery, lit, logical_plan::LogicalPlanBuilder,
-        not_in_subquery, or, Operator,
-    };
-
-    fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> {
-        assert_optimized_plan_eq_display_indent(
-            Arc::new(SubqueryFilterToJoin::new()),
-            plan,
-            expected,
-        );
-        Ok(())
-    }
-
-    fn test_subquery_with_name(name: &str) -> Result<Arc<LogicalPlan>> {
-        let table_scan = test_table_scan_with_name(name)?;
-        Ok(Arc::new(
-            LogicalPlanBuilder::from(table_scan)
-                .project(vec![col("c")])?
-                .build()?,
-        ))
-    }
-
-    /// Test for single IN subquery filter
-    #[test]
-    fn in_subquery_simple() -> Result<()> {
-        let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(table_scan)
-            .filter(in_subquery(col("c"), test_subquery_with_name("sq")?))?
-            .project(vec![col("test.b")])?
-            .build()?;
-
-        let expected = "Projection: test.b [b:UInt32]\
-        \n  LeftSemi Join: test.c = sq.c [a:UInt32, b:UInt32, c:UInt32]\
-        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
-        \n    Projection: sq.c [c:UInt32]\
-        \n      TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
-
-        assert_optimized_plan_equal(&plan, expected)
-    }
-
-    /// Test for single NOT IN subquery filter
-    #[test]
-    fn not_in_subquery_simple() -> Result<()> {
-        let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(table_scan)
-            .filter(not_in_subquery(col("c"), test_subquery_with_name("sq")?))?
-            .project(vec![col("test.b")])?
-            .build()?;
-
-        let expected = "Projection: test.b [b:UInt32]\
-        \n  LeftAnti Join: test.c = sq.c [a:UInt32, b:UInt32, c:UInt32]\
-        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
-        \n    Projection: sq.c [c:UInt32]\
-        \n      TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
-
-        assert_optimized_plan_equal(&plan, expected)
-    }
-
-    /// Test for several IN subquery expressions
-    #[test]
-    fn in_subquery_multiple() -> Result<()> {
-        let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(table_scan)
-            .filter(and(
-                in_subquery(col("c"), test_subquery_with_name("sq_1")?),
-                in_subquery(col("b"), test_subquery_with_name("sq_2")?),
-            ))?
-            .project(vec![col("test.b")])?
-            .build()?;
-
-        let expected = "Projection: test.b [b:UInt32]\
-        \n  LeftSemi Join: test.b = sq_2.c [a:UInt32, b:UInt32, c:UInt32]\
-        \n    LeftSemi Join: test.c = sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
-        \n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
-        \n      Projection: sq_1.c [c:UInt32]\
-        \n        TableScan: sq_1 [a:UInt32, b:UInt32, c:UInt32]\
-        \n    Projection: sq_2.c [c:UInt32]\
-        \n      TableScan: sq_2 [a:UInt32, b:UInt32, c:UInt32]";
-
-        assert_optimized_plan_equal(&plan, expected)
-    }
-
-    /// Test for IN subquery with additional AND filter
-    #[test]
-    fn in_subquery_with_and_filters() -> Result<()> {
-        let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(table_scan)
-            .filter(and(
-                in_subquery(col("c"), test_subquery_with_name("sq")?),
-                and(
-                    binary_expr(col("a"), Operator::Eq, lit(1_u32)),
-                    binary_expr(col("b"), Operator::Lt, lit(30_u32)),
-                ),
-            ))?
-            .project(vec![col("test.b")])?
-            .build()?;
-
-        let expected = "Projection: test.b [b:UInt32]\
-        \n  Filter: test.a = UInt32(1) AND test.b < UInt32(30) [a:UInt32, b:UInt32, c:UInt32]\
-        \n    LeftSemi Join: test.c = sq.c [a:UInt32, b:UInt32, c:UInt32]\
-        \n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
-        \n      Projection: sq.c [c:UInt32]\
-        \n        TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
-
-        assert_optimized_plan_equal(&plan, expected)
-    }
-
-    /// Test for IN subquery with additional OR filter
-    /// filter expression not modified
-    #[test]
-    fn in_subquery_with_or_filters() -> Result<()> {
-        let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(table_scan)
-            .filter(or(
-                and(
-                    binary_expr(col("a"), Operator::Eq, lit(1_u32)),
-                    binary_expr(col("b"), Operator::Lt, lit(30_u32)),
-                ),
-                in_subquery(col("c"), test_subquery_with_name("sq")?),
-            ))?
-            .project(vec![col("test.b")])?
-            .build()?;
-
-        let expected = "Projection: test.b [b:UInt32]\
-        \n  Filter: test.a = UInt32(1) AND test.b < UInt32(30) OR test.c IN (<subquery>) [a:UInt32, b:UInt32, c:UInt32]\
-        \n    Subquery: [c:UInt32]\
-        \n      Projection: sq.c [c:UInt32]\
-        \n        TableScan: sq [a:UInt32, b:UInt32, c:UInt32]\
-        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
-
-        assert_optimized_plan_equal(&plan, expected)
-    }
-
-    #[test]
-    fn in_subquery_with_and_or_filters() -> Result<()> {
-        let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(table_scan)
-            .filter(and(
-                or(
-                    binary_expr(col("a"), Operator::Eq, lit(1_u32)),
-                    in_subquery(col("b"), test_subquery_with_name("sq1")?),
-                ),
-                in_subquery(col("c"), test_subquery_with_name("sq2")?),
-            ))?
-            .project(vec![col("test.b")])?
-            .build()?;
-
-        let expected = "Projection: test.b [b:UInt32]\
-        \n  Filter: (test.a = UInt32(1) OR test.b IN (<subquery>)) AND test.c IN (<subquery>) [a:UInt32, b:UInt32, c:UInt32]\
-        \n    Subquery: [c:UInt32]\
-        \n      Projection: sq1.c [c:UInt32]\
-        \n        TableScan: sq1 [a:UInt32, b:UInt32, c:UInt32]\
-        \n    Subquery: [c:UInt32]\
-        \n      Projection: sq2.c [c:UInt32]\
-        \n        TableScan: sq2 [a:UInt32, b:UInt32, c:UInt32]\
-        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
-
-        assert_optimized_plan_equal(&plan, expected)
-    }
-
-    /// Test for nested IN subqueries
-    #[test]
-    fn in_subquery_nested() -> Result<()> {
-        let table_scan = test_table_scan()?;
-
-        let subquery = LogicalPlanBuilder::from(test_table_scan_with_name("sq")?)
-            .filter(in_subquery(col("a"), test_subquery_with_name("sq_nested")?))?
-            .project(vec![col("a")])?
-            .build()?;
-
-        let plan = LogicalPlanBuilder::from(table_scan)
-            .filter(in_subquery(col("b"), Arc::new(subquery)))?
-            .project(vec![col("test.b")])?
-            .build()?;
-
-        let expected = "Projection: test.b [b:UInt32]\
-        \n  LeftSemi Join: test.b = sq.a [a:UInt32, b:UInt32, c:UInt32]\
-        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
-        \n    Projection: sq.a [a:UInt32]\
-        \n      LeftSemi Join: sq.a = sq_nested.c [a:UInt32, b:UInt32, c:UInt32]\
-        \n        TableScan: sq [a:UInt32, b:UInt32, c:UInt32]\
-        \n        Projection: sq_nested.c [c:UInt32]\
-        \n          TableScan: sq_nested [a:UInt32, b:UInt32, c:UInt32]";
-
-        assert_optimized_plan_equal(&plan, expected)
-    }
-
-    /// Test for filter input modification in case filter not supported
-    /// Outer filter expression not modified while inner converted to join
-    #[test]
-    fn in_subquery_input_modified() -> Result<()> {
-        let table_scan = test_table_scan()?;
-        let plan = LogicalPlanBuilder::from(table_scan)
-            .filter(in_subquery(col("c"), test_subquery_with_name("sq_inner")?))?
-            .project(vec![col("b"), col("c")])?
-            .alias("wrapped")?
-            .filter(or(
-                binary_expr(col("b"), Operator::Lt, lit(30_u32)),
-                in_subquery(col("c"), test_subquery_with_name("sq_outer")?),
-            ))?
-            .project(vec![col("b")])?
-            .build()?;
-
-        let expected = "Projection: wrapped.b [b:UInt32]\
-        \n  Filter: wrapped.b < UInt32(30) OR wrapped.c IN (<subquery>) [b:UInt32, c:UInt32]\
-        \n    Subquery: [c:UInt32]\
-        \n      Projection: sq_outer.c [c:UInt32]\
-        \n        TableScan: sq_outer [a:UInt32, b:UInt32, c:UInt32]\
-        \n    SubqueryAlias: wrapped [b:UInt32, c:UInt32]\
-        \n      Projection: test.b, test.c [b:UInt32, c:UInt32]\
-        \n        LeftSemi Join: test.c = sq_inner.c [a:UInt32, b:UInt32, c:UInt32]\
-        \n          TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
-        \n          Projection: sq_inner.c [c:UInt32]\
-        \n            TableScan: sq_inner [a:UInt32, b:UInt32, c:UInt32]";
-
-        assert_optimized_plan_equal(&plan, expected)
-    }
-}