You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/12/28 21:03:03 UTC
[arrow-datafusion] branch master updated: remove `SubqueryFilterToJoin` (#4731)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new fc8fb0631 remove `SubqueryFilterToJoin` (#4731)
fc8fb0631 is described below
commit fc8fb0631bf234c3575834adaea0e4a25c996004
Author: jakevin <ja...@gmail.com>
AuthorDate: Thu Dec 29 05:02:57 2022 +0800
remove `SubqueryFilterToJoin` (#4731)
* remove `SubqueryFilterToJoin`
* correct UT
---
datafusion/optimizer/src/decorrelate_where_in.rs | 188 ++++++++-
datafusion/optimizer/src/lib.rs | 1 -
datafusion/optimizer/src/optimizer.rs | 2 -
.../optimizer/src/subquery_filter_to_join.rs | 421 ---------------------
4 files changed, 187 insertions(+), 425 deletions(-)
diff --git a/datafusion/optimizer/src/decorrelate_where_in.rs b/datafusion/optimizer/src/decorrelate_where_in.rs
index bcc120723..c2a80ac2b 100644
--- a/datafusion/optimizer/src/decorrelate_where_in.rs
+++ b/datafusion/optimizer/src/decorrelate_where_in.rs
@@ -220,10 +220,196 @@ mod tests {
use crate::test::*;
use datafusion_common::Result;
use datafusion_expr::{
- col, in_subquery, lit, logical_plan::LogicalPlanBuilder, not_in_subquery,
+ and, binary_expr, col, in_subquery, lit, logical_plan::LogicalPlanBuilder,
+ not_in_subquery, or, Operator,
};
use std::ops::Add;
+ fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> {
+ assert_optimized_plan_eq_display_indent(
+ Arc::new(DecorrelateWhereIn::new()),
+ plan,
+ expected,
+ );
+ Ok(())
+ }
+
+ fn test_subquery_with_name(name: &str) -> Result<Arc<LogicalPlan>> {
+ let table_scan = test_table_scan_with_name(name)?;
+ Ok(Arc::new(
+ LogicalPlanBuilder::from(table_scan)
+ .project(vec![col("c")])?
+ .build()?,
+ ))
+ }
+
+ /// Test for several IN subquery expressions
+ #[test]
+ fn in_subquery_multiple() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .filter(and(
+ in_subquery(col("c"), test_subquery_with_name("sq_1")?),
+ in_subquery(col("b"), test_subquery_with_name("sq_2")?),
+ ))?
+ .project(vec![col("test.b")])?
+ .build()?;
+
+ let expected = "Projection: test.b [b:UInt32]\
+ \n LeftSemi Join: test.b = __sq_2.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
+ \n SubqueryAlias: __sq_1 [c:UInt32]\
+ \n Projection: sq_1.c AS c [c:UInt32]\
+ \n TableScan: sq_1 [a:UInt32, b:UInt32, c:UInt32]\
+ \n SubqueryAlias: __sq_2 [c:UInt32]\
+ \n Projection: sq_2.c AS c [c:UInt32]\
+ \n TableScan: sq_2 [a:UInt32, b:UInt32, c:UInt32]";
+
+ assert_optimized_plan_equal(&plan, expected)
+ }
+
+ /// Test for IN subquery with additional AND filter
+ #[test]
+ fn in_subquery_with_and_filters() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .filter(and(
+ in_subquery(col("c"), test_subquery_with_name("sq")?),
+ and(
+ binary_expr(col("a"), Operator::Eq, lit(1_u32)),
+ binary_expr(col("b"), Operator::Lt, lit(30_u32)),
+ ),
+ ))?
+ .project(vec![col("test.b")])?
+ .build()?;
+
+ let expected = "Projection: test.b [b:UInt32]\
+ \n Filter: test.a = UInt32(1) AND test.b < UInt32(30) [a:UInt32, b:UInt32, c:UInt32]\
+ \n LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
+ \n SubqueryAlias: __sq_1 [c:UInt32]\
+ \n Projection: sq.c AS c [c:UInt32]\
+ \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
+
+ assert_optimized_plan_equal(&plan, expected)
+ }
+
+ /// Test for IN subquery with additional OR filter
+ /// filter expression not modified
+ #[test]
+ fn in_subquery_with_or_filters() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .filter(or(
+ and(
+ binary_expr(col("a"), Operator::Eq, lit(1_u32)),
+ binary_expr(col("b"), Operator::Lt, lit(30_u32)),
+ ),
+ in_subquery(col("c"), test_subquery_with_name("sq")?),
+ ))?
+ .project(vec![col("test.b")])?
+ .build()?;
+
+ let expected = "Projection: test.b [b:UInt32]\
+ \n Filter: test.a = UInt32(1) AND test.b < UInt32(30) OR test.c IN (<subquery>) [a:UInt32, b:UInt32, c:UInt32]\
+ \n Subquery: [c:UInt32]\
+ \n Projection: sq.c [c:UInt32]\
+ \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]\
+ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
+
+ assert_optimized_plan_equal(&plan, expected)
+ }
+
+ #[test]
+ fn in_subquery_with_and_or_filters() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .filter(and(
+ or(
+ binary_expr(col("a"), Operator::Eq, lit(1_u32)),
+ in_subquery(col("b"), test_subquery_with_name("sq1")?),
+ ),
+ in_subquery(col("c"), test_subquery_with_name("sq2")?),
+ ))?
+ .project(vec![col("test.b")])?
+ .build()?;
+
+ let expected = "Projection: test.b [b:UInt32]\
+ \n Filter: test.a = UInt32(1) OR test.b IN (<subquery>) [a:UInt32, b:UInt32, c:UInt32]\
+ \n Subquery: [c:UInt32]\
+ \n Projection: sq1.c [c:UInt32]\
+ \n TableScan: sq1 [a:UInt32, b:UInt32, c:UInt32]\
+ \n LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
+ \n SubqueryAlias: __sq_1 [c:UInt32]\
+ \n Projection: sq2.c AS c [c:UInt32]\
+ \n TableScan: sq2 [a:UInt32, b:UInt32, c:UInt32]";
+
+ assert_optimized_plan_equal(&plan, expected)
+ }
+
+ /// Test for nested IN subqueries
+ #[test]
+ fn in_subquery_nested() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let subquery = LogicalPlanBuilder::from(test_table_scan_with_name("sq")?)
+ .filter(in_subquery(col("a"), test_subquery_with_name("sq_nested")?))?
+ .project(vec![col("a")])?
+ .build()?;
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .filter(in_subquery(col("b"), Arc::new(subquery)))?
+ .project(vec![col("test.b")])?
+ .build()?;
+
+ let expected = "Projection: test.b [b:UInt32]\
+ \n LeftSemi Join: test.b = __sq_1.a [a:UInt32, b:UInt32, c:UInt32]\
+ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
+ \n SubqueryAlias: __sq_1 [a:UInt32]\
+ \n Projection: sq.a AS a [a:UInt32]\
+ \n LeftSemi Join: sq.a = __sq_2.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]\
+ \n SubqueryAlias: __sq_2 [c:UInt32]\
+ \n Projection: sq_nested.c AS c [c:UInt32]\
+ \n TableScan: sq_nested [a:UInt32, b:UInt32, c:UInt32]";
+
+ assert_optimized_plan_equal(&plan, expected)
+ }
+
+ /// Test for filter input modification in case filter not supported
+ /// Outer filter expression not modified while inner converted to join
+ #[test]
+ fn in_subquery_input_modified() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .filter(in_subquery(col("c"), test_subquery_with_name("sq_inner")?))?
+ .project(vec![col("b"), col("c")])?
+ .alias("wrapped")?
+ .filter(or(
+ binary_expr(col("b"), Operator::Lt, lit(30_u32)),
+ in_subquery(col("c"), test_subquery_with_name("sq_outer")?),
+ ))?
+ .project(vec![col("b")])?
+ .build()?;
+
+ let expected = "Projection: wrapped.b [b:UInt32]\
+ \n Filter: wrapped.b < UInt32(30) OR wrapped.c IN (<subquery>) [b:UInt32, c:UInt32]\
+ \n Subquery: [c:UInt32]\
+ \n Projection: sq_outer.c [c:UInt32]\
+ \n TableScan: sq_outer [a:UInt32, b:UInt32, c:UInt32]\
+ \n SubqueryAlias: wrapped [b:UInt32, c:UInt32]\
+ \n Projection: test.b, test.c [b:UInt32, c:UInt32]\
+ \n LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
+ \n SubqueryAlias: __sq_1 [c:UInt32]\
+ \n Projection: sq_inner.c AS c [c:UInt32]\
+ \n TableScan: sq_inner [a:UInt32, b:UInt32, c:UInt32]";
+
+ assert_optimized_plan_equal(&plan, expected)
+ }
+
#[cfg(test)]
#[ctor::ctor]
fn init() {
diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs
index b03725fe6..27e6dff08 100644
--- a/datafusion/optimizer/src/lib.rs
+++ b/datafusion/optimizer/src/lib.rs
@@ -33,7 +33,6 @@ pub mod push_down_projection;
pub mod scalar_subquery_to_join;
pub mod simplify_expressions;
pub mod single_distinct_to_groupby;
-pub mod subquery_filter_to_join;
pub mod type_coercion;
pub mod utils;
diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs
index 36968f2f1..b27c49d2f 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -35,7 +35,6 @@ use crate::rewrite_disjunctive_predicate::RewriteDisjunctivePredicate;
use crate::scalar_subquery_to_join::ScalarSubqueryToJoin;
use crate::simplify_expressions::SimplifyExpressions;
use crate::single_distinct_to_groupby::SingleDistinctToGroupBy;
-use crate::subquery_filter_to_join::SubqueryFilterToJoin;
use crate::type_coercion::TypeCoercion;
use crate::unwrap_cast_in_comparison::UnwrapCastInComparison;
use chrono::{DateTime, Utc};
@@ -244,7 +243,6 @@ impl Optimizer {
Arc::new(DecorrelateWhereExists::new()),
Arc::new(DecorrelateWhereIn::new()),
Arc::new(ScalarSubqueryToJoin::new()),
- Arc::new(SubqueryFilterToJoin::new()),
// simplify expressions does not simplify expressions in subqueries, so we
// run it again after running the optimizations that potentially converted
// subqueries to joins
diff --git a/datafusion/optimizer/src/subquery_filter_to_join.rs b/datafusion/optimizer/src/subquery_filter_to_join.rs
deleted file mode 100644
index 696f911a1..000000000
--- a/datafusion/optimizer/src/subquery_filter_to_join.rs
+++ /dev/null
@@ -1,421 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Optimizer rule for rewriting subquery filters to joins
-//!
-//! It handles standalone parts of logical conjunction expressions, i.e.
-//! ```text
-//! WHERE t1.f IN (SELECT f FROM t2) AND t2.f = 'x'
-//! ```
-//! will be rewritten, but
-//! ```text
-//! WHERE t1.f IN (SELECT f FROM t2) OR t2.f = 'x'
-//! ```
-//! won't
-use crate::optimizer::ApplyOrder;
-use crate::{utils, OptimizerConfig, OptimizerRule};
-use datafusion_common::{DataFusionError, Result};
-use datafusion_expr::{
- expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion},
- logical_plan::{
- builder::build_join_schema, Filter, Join, JoinConstraint, JoinType, LogicalPlan,
- },
- Expr,
-};
-use std::sync::Arc;
-
-/// Optimizer rule for rewriting subquery filters to joins
-#[derive(Default)]
-pub struct SubqueryFilterToJoin {}
-
-impl SubqueryFilterToJoin {
- #[allow(missing_docs)]
- pub fn new() -> Self {
- Self {}
- }
-}
-
-impl OptimizerRule for SubqueryFilterToJoin {
- fn try_optimize(
- &self,
- plan: &LogicalPlan,
- _config: &dyn OptimizerConfig,
- ) -> Result<Option<LogicalPlan>> {
- match plan {
- LogicalPlan::Filter(filter) => {
- // Apply optimizer rule to current input
- let input = filter.input.as_ref().clone();
-
- // Splitting filter expression into components by AND
- let filters = utils::split_conjunction(&filter.predicate);
-
- // Searching for subquery-based filters
- let (subquery_filters, regular_filters): (Vec<&Expr>, Vec<&Expr>) =
- filters
- .into_iter()
- .partition(|&e| matches!(e, Expr::InSubquery { .. }));
-
- // Check all subquery filters could be rewritten
- //
- // In case of expressions which could not be rewritten
- // return original filter with optimized input
- let mut subqueries_in_regular = vec![];
- regular_filters.iter().try_for_each(|&e| {
- extract_subquery_filters(e, &mut subqueries_in_regular)
- })?;
-
- if !subqueries_in_regular.is_empty() {
- return Ok(Some(LogicalPlan::Filter(Filter::try_new(
- filter.predicate.clone(),
- Arc::new(input),
- )?)));
- };
-
- // Add subquery joins to new_input
- // optimized_input value should retain for possible optimization rollback
- let opt_result = subquery_filters.iter().try_fold(
- input.clone(),
- |input, &e| match e {
- Expr::InSubquery {
- expr,
- subquery,
- negated,
- } => {
- let right_input = self.try_optimize(
- &subquery.subquery,
- _config
- )?.unwrap_or_else(||subquery.subquery.as_ref().clone());
- let right_schema = right_input.schema();
- if right_schema.fields().len() != 1 {
- return Err(DataFusionError::Plan(
- "Only single column allowed in InSubquery"
- .to_string(),
- ));
- };
-
- let right_key = right_schema.field(0).qualified_column();
- let left_key = match *expr.clone() {
- Expr::Column(col) => col,
- _ => return Err(DataFusionError::NotImplemented(
- "Filtering by expression not implemented for InSubquery"
- .to_string(),
- )),
- };
-
- let join_type = if *negated {
- JoinType::LeftAnti
- } else {
- JoinType::LeftSemi
- };
-
- let schema = build_join_schema(
- input.schema(),
- right_schema,
- &join_type,
- )?;
-
- Ok(LogicalPlan::Join(Join {
- left: Arc::new(input),
- right: Arc::new(right_input),
- on: vec![(Expr::Column(left_key), Expr::Column(right_key))],
- filter: None,
- join_type,
- join_constraint: JoinConstraint::On,
- schema: Arc::new(schema),
- null_equals_null: false,
- }))
- }
- _ => Err(DataFusionError::Plan(
- "Unknown expression while rewriting subquery to joins"
- .to_string(),
- )),
- }
- );
-
- // In case of expressions which could not be rewritten
- // return original filter with optimized input
- let new_input = match opt_result {
- Ok(plan) => plan,
- Err(_) => {
- return Ok(Some(LogicalPlan::Filter(Filter::try_new(
- filter.predicate.clone(),
- Arc::new(input),
- )?)))
- }
- };
-
- // Apply regular filters to join output if some or just return join
- if regular_filters.is_empty() {
- Ok(Some(new_input))
- } else {
- Ok(Some(utils::add_filter(new_input, ®ular_filters)?))
- }
- }
- _ => Ok(None),
- }
- }
-
- fn name(&self) -> &str {
- "subquery_filter_to_join"
- }
-
- fn apply_order(&self) -> Option<ApplyOrder> {
- Some(ApplyOrder::TopDown)
- }
-}
-
-fn extract_subquery_filters(expression: &Expr, extracted: &mut Vec<Expr>) -> Result<()> {
- struct InSubqueryVisitor<'a> {
- accum: &'a mut Vec<Expr>,
- }
-
- impl ExpressionVisitor for InSubqueryVisitor<'_> {
- fn pre_visit(self, expr: &Expr) -> Result<Recursion<Self>> {
- if let Expr::InSubquery { .. } = expr {
- self.accum.push(expr.to_owned());
- }
- Ok(Recursion::Continue(self))
- }
- }
-
- expression.accept(InSubqueryVisitor { accum: extracted })?;
- Ok(())
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::test::*;
- use datafusion_expr::{
- and, binary_expr, col, in_subquery, lit, logical_plan::LogicalPlanBuilder,
- not_in_subquery, or, Operator,
- };
-
- fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> {
- assert_optimized_plan_eq_display_indent(
- Arc::new(SubqueryFilterToJoin::new()),
- plan,
- expected,
- );
- Ok(())
- }
-
- fn test_subquery_with_name(name: &str) -> Result<Arc<LogicalPlan>> {
- let table_scan = test_table_scan_with_name(name)?;
- Ok(Arc::new(
- LogicalPlanBuilder::from(table_scan)
- .project(vec![col("c")])?
- .build()?,
- ))
- }
-
- /// Test for single IN subquery filter
- #[test]
- fn in_subquery_simple() -> Result<()> {
- let table_scan = test_table_scan()?;
- let plan = LogicalPlanBuilder::from(table_scan)
- .filter(in_subquery(col("c"), test_subquery_with_name("sq")?))?
- .project(vec![col("test.b")])?
- .build()?;
-
- let expected = "Projection: test.b [b:UInt32]\
- \n LeftSemi Join: test.c = sq.c [a:UInt32, b:UInt32, c:UInt32]\
- \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
- \n Projection: sq.c [c:UInt32]\
- \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
-
- assert_optimized_plan_equal(&plan, expected)
- }
-
- /// Test for single NOT IN subquery filter
- #[test]
- fn not_in_subquery_simple() -> Result<()> {
- let table_scan = test_table_scan()?;
- let plan = LogicalPlanBuilder::from(table_scan)
- .filter(not_in_subquery(col("c"), test_subquery_with_name("sq")?))?
- .project(vec![col("test.b")])?
- .build()?;
-
- let expected = "Projection: test.b [b:UInt32]\
- \n LeftAnti Join: test.c = sq.c [a:UInt32, b:UInt32, c:UInt32]\
- \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
- \n Projection: sq.c [c:UInt32]\
- \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
-
- assert_optimized_plan_equal(&plan, expected)
- }
-
- /// Test for several IN subquery expressions
- #[test]
- fn in_subquery_multiple() -> Result<()> {
- let table_scan = test_table_scan()?;
- let plan = LogicalPlanBuilder::from(table_scan)
- .filter(and(
- in_subquery(col("c"), test_subquery_with_name("sq_1")?),
- in_subquery(col("b"), test_subquery_with_name("sq_2")?),
- ))?
- .project(vec![col("test.b")])?
- .build()?;
-
- let expected = "Projection: test.b [b:UInt32]\
- \n LeftSemi Join: test.b = sq_2.c [a:UInt32, b:UInt32, c:UInt32]\
- \n LeftSemi Join: test.c = sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
- \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
- \n Projection: sq_1.c [c:UInt32]\
- \n TableScan: sq_1 [a:UInt32, b:UInt32, c:UInt32]\
- \n Projection: sq_2.c [c:UInt32]\
- \n TableScan: sq_2 [a:UInt32, b:UInt32, c:UInt32]";
-
- assert_optimized_plan_equal(&plan, expected)
- }
-
- /// Test for IN subquery with additional AND filter
- #[test]
- fn in_subquery_with_and_filters() -> Result<()> {
- let table_scan = test_table_scan()?;
- let plan = LogicalPlanBuilder::from(table_scan)
- .filter(and(
- in_subquery(col("c"), test_subquery_with_name("sq")?),
- and(
- binary_expr(col("a"), Operator::Eq, lit(1_u32)),
- binary_expr(col("b"), Operator::Lt, lit(30_u32)),
- ),
- ))?
- .project(vec![col("test.b")])?
- .build()?;
-
- let expected = "Projection: test.b [b:UInt32]\
- \n Filter: test.a = UInt32(1) AND test.b < UInt32(30) [a:UInt32, b:UInt32, c:UInt32]\
- \n LeftSemi Join: test.c = sq.c [a:UInt32, b:UInt32, c:UInt32]\
- \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
- \n Projection: sq.c [c:UInt32]\
- \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
-
- assert_optimized_plan_equal(&plan, expected)
- }
-
- /// Test for IN subquery with additional OR filter
- /// filter expression not modified
- #[test]
- fn in_subquery_with_or_filters() -> Result<()> {
- let table_scan = test_table_scan()?;
- let plan = LogicalPlanBuilder::from(table_scan)
- .filter(or(
- and(
- binary_expr(col("a"), Operator::Eq, lit(1_u32)),
- binary_expr(col("b"), Operator::Lt, lit(30_u32)),
- ),
- in_subquery(col("c"), test_subquery_with_name("sq")?),
- ))?
- .project(vec![col("test.b")])?
- .build()?;
-
- let expected = "Projection: test.b [b:UInt32]\
- \n Filter: test.a = UInt32(1) AND test.b < UInt32(30) OR test.c IN (<subquery>) [a:UInt32, b:UInt32, c:UInt32]\
- \n Subquery: [c:UInt32]\
- \n Projection: sq.c [c:UInt32]\
- \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]\
- \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
-
- assert_optimized_plan_equal(&plan, expected)
- }
-
- #[test]
- fn in_subquery_with_and_or_filters() -> Result<()> {
- let table_scan = test_table_scan()?;
- let plan = LogicalPlanBuilder::from(table_scan)
- .filter(and(
- or(
- binary_expr(col("a"), Operator::Eq, lit(1_u32)),
- in_subquery(col("b"), test_subquery_with_name("sq1")?),
- ),
- in_subquery(col("c"), test_subquery_with_name("sq2")?),
- ))?
- .project(vec![col("test.b")])?
- .build()?;
-
- let expected = "Projection: test.b [b:UInt32]\
- \n Filter: (test.a = UInt32(1) OR test.b IN (<subquery>)) AND test.c IN (<subquery>) [a:UInt32, b:UInt32, c:UInt32]\
- \n Subquery: [c:UInt32]\
- \n Projection: sq1.c [c:UInt32]\
- \n TableScan: sq1 [a:UInt32, b:UInt32, c:UInt32]\
- \n Subquery: [c:UInt32]\
- \n Projection: sq2.c [c:UInt32]\
- \n TableScan: sq2 [a:UInt32, b:UInt32, c:UInt32]\
- \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
-
- assert_optimized_plan_equal(&plan, expected)
- }
-
- /// Test for nested IN subqueries
- #[test]
- fn in_subquery_nested() -> Result<()> {
- let table_scan = test_table_scan()?;
-
- let subquery = LogicalPlanBuilder::from(test_table_scan_with_name("sq")?)
- .filter(in_subquery(col("a"), test_subquery_with_name("sq_nested")?))?
- .project(vec![col("a")])?
- .build()?;
-
- let plan = LogicalPlanBuilder::from(table_scan)
- .filter(in_subquery(col("b"), Arc::new(subquery)))?
- .project(vec![col("test.b")])?
- .build()?;
-
- let expected = "Projection: test.b [b:UInt32]\
- \n LeftSemi Join: test.b = sq.a [a:UInt32, b:UInt32, c:UInt32]\
- \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
- \n Projection: sq.a [a:UInt32]\
- \n LeftSemi Join: sq.a = sq_nested.c [a:UInt32, b:UInt32, c:UInt32]\
- \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]\
- \n Projection: sq_nested.c [c:UInt32]\
- \n TableScan: sq_nested [a:UInt32, b:UInt32, c:UInt32]";
-
- assert_optimized_plan_equal(&plan, expected)
- }
-
- /// Test for filter input modification in case filter not supported
- /// Outer filter expression not modified while inner converted to join
- #[test]
- fn in_subquery_input_modified() -> Result<()> {
- let table_scan = test_table_scan()?;
- let plan = LogicalPlanBuilder::from(table_scan)
- .filter(in_subquery(col("c"), test_subquery_with_name("sq_inner")?))?
- .project(vec![col("b"), col("c")])?
- .alias("wrapped")?
- .filter(or(
- binary_expr(col("b"), Operator::Lt, lit(30_u32)),
- in_subquery(col("c"), test_subquery_with_name("sq_outer")?),
- ))?
- .project(vec![col("b")])?
- .build()?;
-
- let expected = "Projection: wrapped.b [b:UInt32]\
- \n Filter: wrapped.b < UInt32(30) OR wrapped.c IN (<subquery>) [b:UInt32, c:UInt32]\
- \n Subquery: [c:UInt32]\
- \n Projection: sq_outer.c [c:UInt32]\
- \n TableScan: sq_outer [a:UInt32, b:UInt32, c:UInt32]\
- \n SubqueryAlias: wrapped [b:UInt32, c:UInt32]\
- \n Projection: test.b, test.c [b:UInt32, c:UInt32]\
- \n LeftSemi Join: test.c = sq_inner.c [a:UInt32, b:UInt32, c:UInt32]\
- \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
- \n Projection: sq_inner.c [c:UInt32]\
- \n TableScan: sq_inner [a:UInt32, b:UInt32, c:UInt32]";
-
- assert_optimized_plan_equal(&plan, expected)
- }
-}