You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/06/19 11:05:23 UTC
[arrow-datafusion] branch main updated: Port test in subqueries.rs from rust to sqllogictest (#6675)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 873d4178ab Port test in subqueries.rs from rust to sqllogictest (#6675)
873d4178ab is described below
commit 873d4178ab3c98ca6f78ef553c8e33e12dd342fb
Author: zhenxing jiang <ji...@gmail.com>
AuthorDate: Mon Jun 19 19:05:17 2023 +0800
Port test in subqueries.rs from rust to sqllogictest (#6675)
* port correlated_recursive_scalar_subquery to sql
* port correlated_where_in to sql
* port exists_subquery_with_same_table to sql
* port in_subquery_with_same_table to sql
* port in_subquery_nested_exist_subquery to sql
* port invalid_scalar_subquery to sql
* port subquery_not_allowed to sql
* port non_aggregated_correlated_scalar_subquery to sql
* port non_aggregated_correlated_scalar_subquery_with_limit to sql
* port non_aggregated_correlated_scalar_subquery_with_single_row to sql
* port non_equal_correlated_scalar_subquery to sql
* port aggregated_correlated_scalar_subquery_with_extra_group_by_columns to sql
* port support_agg_correlated_columns to sql
* port support_agg_correlated_columns2 to sql
* port support_join_correlated_columns to sql
* port subquery_contains_join_contains_correlated_columns to sql
* port subquery_contains_join_contains_sub_query_alias_correlated_columns to sql
* port support_order_by_correlated_columns to sql
* port exists_subquery_with_select_null to sql
* port exists_subquery_with_limit to sql
* port exists_subquery_with_limit0 to sql
* port not_exists_subquery_with_limit0 to sql
* port in_correlated_subquery_with_limit to sql
* port in_non_correlated_subquery_with_limit to sql
* port uncorrelated_scalar_subquery_with_limit0 to sql
* port support_union_subquery to sql
* port simple_uncorrelated_scalar_subquery to sql
* port simple_uncorrelated_scalar_subquery2 to sql
* port correlated_scalar_subquery_count_agg to sql
* port correlated_scalar_subquery_count_agg2 to sql
* port correlated_scalar_subquery_count_agg_with_alias to sql
* port correlated_scalar_subquery_count_agg_complex_expr to sql
* port correlated_scalar_subquery_count_agg_where_clause to sql
* port correlated_scalar_subquery_count_agg_with_having to sql
* port correlated_scalar_subquery_count_agg_with_pull_up_having to sql
* port correlated_scalar_subquery_count_agg_in_having to sql
* port correlated_scalar_subquery_count_agg_in_nested_projection to sql
* port correlated_scalar_subquery_count_agg_in_nested_subquery to sql
* port correlated_scalar_subquery_count_agg_in_case_when to sql
* clippy fix
---
datafusion/core/tests/sql/mod.rs | 76 --
datafusion/core/tests/sql/subqueries.rs | 1372 --------------------
.../tests/sqllogictests/test_files/subquery.slt | 724 ++++++++++-
3 files changed, 723 insertions(+), 1449 deletions(-)
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index 5fa4b2383b..ca0cfc3dbe 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -201,82 +201,6 @@ fn create_join_context(
Ok(ctx)
}
-fn create_sub_query_join_context(
- column_outer: &str,
- column_inner_left: &str,
- column_inner_right: &str,
- repartition_joins: bool,
-) -> Result<SessionContext> {
- let ctx = SessionContext::with_config(
- SessionConfig::new()
- .with_repartition_joins(repartition_joins)
- .with_target_partitions(2)
- .with_batch_size(4096),
- );
-
- let t0_schema = Arc::new(Schema::new(vec![
- Field::new(column_outer, DataType::UInt32, true),
- Field::new("t0_name", DataType::Utf8, true),
- Field::new("t0_int", DataType::UInt32, true),
- ]));
- let t0_data = RecordBatch::try_new(
- t0_schema,
- vec![
- Arc::new(UInt32Array::from(vec![11, 22, 33, 44])),
- Arc::new(StringArray::from(vec![
- Some("a"),
- Some("b"),
- Some("c"),
- Some("d"),
- ])),
- Arc::new(UInt32Array::from(vec![1, 2, 3, 4])),
- ],
- )?;
- ctx.register_batch("t0", t0_data)?;
-
- let t1_schema = Arc::new(Schema::new(vec![
- Field::new(column_inner_left, DataType::UInt32, true),
- Field::new("t1_name", DataType::Utf8, true),
- Field::new("t1_int", DataType::UInt32, true),
- ]));
- let t1_data = RecordBatch::try_new(
- t1_schema,
- vec![
- Arc::new(UInt32Array::from(vec![11, 22, 33, 44])),
- Arc::new(StringArray::from(vec![
- Some("a"),
- Some("b"),
- Some("c"),
- Some("d"),
- ])),
- Arc::new(UInt32Array::from(vec![1, 2, 3, 4])),
- ],
- )?;
- ctx.register_batch("t1", t1_data)?;
-
- let t2_schema = Arc::new(Schema::new(vec![
- Field::new(column_inner_right, DataType::UInt32, true),
- Field::new("t2_name", DataType::Utf8, true),
- Field::new("t2_int", DataType::UInt32, true),
- ]));
- let t2_data = RecordBatch::try_new(
- t2_schema,
- vec![
- Arc::new(UInt32Array::from(vec![11, 22, 44, 55])),
- Arc::new(StringArray::from(vec![
- Some("z"),
- Some("y"),
- Some("x"),
- Some("w"),
- ])),
- Arc::new(UInt32Array::from(vec![3, 1, 3, 3])),
- ],
- )?;
- ctx.register_batch("t2", t2_data)?;
-
- Ok(ctx)
-}
-
fn create_left_semi_anti_join_context_with_null_ids(
column_left: &str,
column_right: &str,
diff --git a/datafusion/core/tests/sql/subqueries.rs b/datafusion/core/tests/sql/subqueries.rs
index 1fd627bc77..7d38b9173c 100644
--- a/datafusion/core/tests/sql/subqueries.rs
+++ b/datafusion/core/tests/sql/subqueries.rs
@@ -17,1109 +17,6 @@
use super::*;
use crate::sql::execute_to_batches;
-use datafusion::assert_batches_eq;
-use datafusion::prelude::SessionContext;
-use log::debug;
-
-#[tokio::test]
-async fn correlated_recursive_scalar_subquery() -> Result<()> {
- let ctx = SessionContext::new();
- register_tpch_csv(&ctx, "customer").await?;
- register_tpch_csv(&ctx, "orders").await?;
- register_tpch_csv(&ctx, "lineitem").await?;
-
- let sql = r#"
-select c_custkey from customer
-where c_acctbal < (
- select sum(o_totalprice) from orders
- where o_custkey = c_custkey
- and o_totalprice < (
- select sum(l_extendedprice) as price from lineitem where l_orderkey = o_orderkey
- )
-) order by c_custkey;"#;
-
- // assert plan
- let dataframe = ctx.sql(sql).await.unwrap();
- debug!("input:\n{}", dataframe.logical_plan().display_indent());
-
- let plan = dataframe.into_optimized_plan().unwrap();
- let actual = format!("{}", plan.display_indent());
- let expected = "Sort: customer.c_custkey ASC NULLS LAST\
- \n Projection: customer.c_custkey\
- \n Inner Join: customer.c_custkey = __scalar_sq_1.o_custkey Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_1.SUM(orders.o_totalprice)\
- \n TableScan: customer projection=[c_custkey, c_acctbal]\
- \n SubqueryAlias: __scalar_sq_1\
- \n Projection: SUM(orders.o_totalprice), orders.o_custkey\
- \n Aggregate: groupBy=[[orders.o_custkey]], aggr=[[SUM(orders.o_totalprice)]]\
- \n Projection: orders.o_custkey, orders.o_totalprice\
- \n Inner Join: orders.o_orderkey = __scalar_sq_2.l_orderkey Filter: CAST(orders.o_totalprice AS Decimal128(25, 2)) < __scalar_sq_2.price\
- \n TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice]\
- \n SubqueryAlias: __scalar_sq_2\
- \n Projection: SUM(lineitem.l_extendedprice) AS price, lineitem.l_orderkey\
- \n Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_extendedprice)]]\
- \n TableScan: lineitem projection=[l_orderkey, l_extendedprice]";
- assert_eq!(actual, expected);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn correlated_where_in() -> Result<()> {
- let orders = r#"1,3691,O,194029.55,1996-01-02,5-LOW,Clerk#000000951,0,
-65,1627,P,99763.79,1995-03-18,1-URGENT,Clerk#000000632,0,
-"#;
- let lineitems = r#"1,15519,785,1,17,24386.67,0.04,0.02,N,O,1996-03-13,1996-02-12,1996-03-22,DELIVER IN PERSON,TRUCK,
-1,6731,732,2,36,58958.28,0.09,0.06,N,O,1996-04-12,1996-02-28,1996-04-20,TAKE BACK RETURN,MAIL,
-65,5970,481,1,26,48775.22,0.03,0.03,A,F,1995-04-20,1995-04-25,1995-05-13,NONE,TRUCK,
-65,7382,897,2,22,28366.36,0,0.05,N,O,1995-07-17,1995-06-04,1995-07-19,COLLECT COD,FOB,
-"#;
-
- let ctx = SessionContext::new();
- register_tpch_csv_data(&ctx, "orders", orders).await?;
- register_tpch_csv_data(&ctx, "lineitem", lineitems).await?;
-
- let sql = r#"select o_orderkey from orders
-where o_orderstatus in (
- select l_linestatus from lineitem where l_orderkey = orders.o_orderkey
-);"#;
-
- // assert plan
- let dataframe = ctx.sql(sql).await.unwrap();
- let plan = dataframe.into_optimized_plan().unwrap();
- let actual = format!("{}", plan.display_indent());
-
- let expected = "Projection: orders.o_orderkey\
- \n LeftSemi Join: orders.o_orderstatus = __correlated_sq_1.l_linestatus, orders.o_orderkey = __correlated_sq_1.l_orderkey\
- \n TableScan: orders projection=[o_orderkey, o_orderstatus]\
- \n SubqueryAlias: __correlated_sq_1\
- \n Projection: lineitem.l_linestatus, lineitem.l_orderkey\
- \n TableScan: lineitem projection=[l_orderkey, l_linestatus]";
- assert_eq!(actual, expected);
-
- // assert data
- let results = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+------------+",
- "| o_orderkey |",
- "+------------+",
- "| 1 |",
- "+------------+",
- ];
- assert_batches_eq!(expected, &results);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn exists_subquery_with_same_table() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- // Subquery and outer query refer to the same table.
- // It will not be rewritten to join because it is not a correlated subquery.
- let sql = "SELECT t1_id, t1_name, t1_int FROM t1 WHERE EXISTS(SELECT t1_int FROM t1 WHERE t1.t1_id > t1.t1_int)";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- let expected = vec![
- "Explain [plan_type:Utf8, plan:Utf8]",
- " Filter: EXISTS (<subquery>) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
- " Subquery: [t1_int:UInt32;N]",
- " Projection: t1.t1_int [t1_int:UInt32;N]",
- " Filter: t1.t1_id > t1.t1_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
- " TableScan: t1 [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
- " TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- Ok(())
-}
-
-#[tokio::test]
-async fn in_subquery_with_same_table() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- // Subquery and outer query refer to the same table.
- // It will be rewritten to join because in-subquery has extra predicate(`t1.t1_id = __correlated_sq_1.t1_int`).
- let sql = "SELECT t1_id, t1_name, t1_int FROM t1 WHERE t1_id IN(SELECT t1_int FROM t1 WHERE t1.t1_id > t1.t1_int)";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- let expected = vec![
- "Explain [plan_type:Utf8, plan:Utf8]",
- " LeftSemi Join: t1.t1_id = __correlated_sq_1.t1_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
- " TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
- " SubqueryAlias: __correlated_sq_1 [t1_int:UInt32;N]",
- " Projection: t1.t1_int [t1_int:UInt32;N]",
- " Filter: t1.t1_id > t1.t1_int [t1_id:UInt32;N, t1_int:UInt32;N]",
- " TableScan: t1 projection=[t1_id, t1_int] [t1_id:UInt32;N, t1_int:UInt32;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- Ok(())
-}
-
-#[tokio::test]
-async fn in_subquery_nested_exist_subquery() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "SELECT t1_id, t1_name, t1_int FROM t1 WHERE t1_id IN(SELECT t2_id FROM t2 WHERE EXISTS(select * from t1 WHERE t1.t1_int > t2.t2_int))";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- let expected = vec![
- "Explain [plan_type:Utf8, plan:Utf8]",
- " LeftSemi Join: t1.t1_id = __correlated_sq_1.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
- " TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
- " SubqueryAlias: __correlated_sq_1 [t2_id:UInt32;N]",
- " Projection: t2.t2_id [t2_id:UInt32;N]",
- " LeftSemi Join: Filter: __correlated_sq_2.t1_int > t2.t2_int [t2_id:UInt32;N, t2_int:UInt32;N]",
- " TableScan: t2 projection=[t2_id, t2_int] [t2_id:UInt32;N, t2_int:UInt32;N]",
- " SubqueryAlias: __correlated_sq_2 [t1_int:UInt32;N]",
- " TableScan: t1 projection=[t1_int] [t1_int:UInt32;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- Ok(())
-}
-
-#[tokio::test]
-async fn invalid_scalar_subquery() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "SELECT t1_id, t1_name, t1_int, (select t2_id, t2_name FROM t2 WHERE t2.t2_id = t1.t1_int) FROM t1";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let err = dataframe.into_optimized_plan().err().unwrap();
- assert_eq!(
- r#"Context("check_analyzed_plan", Plan("Scalar subquery should only return one column, but found 2: t2.t2_id, t2.t2_name"))"#,
- &format!("{err:?}")
- );
-
- Ok(())
-}
-
-#[tokio::test]
-async fn subquery_not_allowed() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- // In/Exist Subquery is not allowed in ORDER BY clause.
- let sql = "SELECT t1_id, t1_name, t1_int FROM t1 order by t1_int in (SELECT t2_int FROM t2 WHERE t1.t1_id > t1.t1_int)";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let err = dataframe.into_optimized_plan().err().unwrap();
-
- assert_eq!(
- r#"Context("check_analyzed_plan", Plan("In/Exist subquery can only be used in Projection, Filter, Window functions, Aggregate and Join plan nodes"))"#,
- &format!("{err:?}")
- );
-
- Ok(())
-}
-
-#[tokio::test]
-async fn non_aggregated_correlated_scalar_subquery() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1.t1_int) as t2_int from t1";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let err = dataframe.into_optimized_plan().err().unwrap();
-
- assert_eq!(
- r#"Context("check_analyzed_plan", Plan("Correlated scalar subquery must be aggregated to return at most one row"))"#,
- &format!("{err:?}")
- );
-
- let sql = "SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1_int group by t2_int) as t2_int from t1";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let err = dataframe.into_optimized_plan().err().unwrap();
-
- assert_eq!(
- r#"Context("check_analyzed_plan", Plan("Correlated scalar subquery must be aggregated to return at most one row"))"#,
- &format!("{err:?}")
- );
-
- Ok(())
-}
-
-#[tokio::test]
-async fn non_aggregated_correlated_scalar_subquery_with_limit() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1.t1_int limit 2) as t2_int from t1";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let err = dataframe.into_optimized_plan().err().unwrap();
-
- assert_eq!(
- r#"Context("check_analyzed_plan", Plan("Correlated scalar subquery must be aggregated to return at most one row"))"#,
- &format!("{err:?}")
- );
-
- Ok(())
-}
-
-#[tokio::test]
-async fn non_aggregated_correlated_scalar_subquery_with_single_row() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1.t1_int limit 1) as t2_int from t1";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- let expected = vec![
- "Projection: t1.t1_id, (<subquery>) AS t2_int [t1_id:UInt32;N, t2_int:UInt32;N]",
- " Subquery: [t2_int:UInt32;N]",
- " Limit: skip=0, fetch=1 [t2_int:UInt32;N]",
- " Projection: t2.t2_int [t2_int:UInt32;N]",
- " Filter: t2.t2_int = outer_ref(t1.t1_int) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
- " TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
- " TableScan: t1 projection=[t1_id] [t1_id:UInt32;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- let sql = "SELECT t1_id from t1 where t1_int = (SELECT t2_int FROM t2 WHERE t2.t2_int = t1.t1_int limit 1)";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- let expected = vec![
- "Projection: t1.t1_id [t1_id:UInt32;N]",
- " Filter: t1.t1_int = (<subquery>) [t1_id:UInt32;N, t1_int:UInt32;N]",
- " Subquery: [t2_int:UInt32;N]",
- " Limit: skip=0, fetch=1 [t2_int:UInt32;N]",
- " Projection: t2.t2_int [t2_int:UInt32;N]",
- " Filter: t2.t2_int = outer_ref(t1.t1_int) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
- " TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
- " TableScan: t1 projection=[t1_id, t1_int] [t1_id:UInt32;N, t1_int:UInt32;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- let sql = "SELECT t1_id, (SELECT a FROM (select 1 as a) WHERE a = t1.t1_int) as t2_int from t1";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- let expected = vec![
- "Projection: t1.t1_id, __scalar_sq_5.a AS t2_int [t1_id:UInt32;N, t2_int:Int64;N]",
- " Left Join: CAST(t1.t1_int AS Int64) = __scalar_sq_5.a [t1_id:UInt32;N, t1_int:UInt32;N, a:Int64;N]",
- " TableScan: t1 projection=[t1_id, t1_int] [t1_id:UInt32;N, t1_int:UInt32;N]",
- " SubqueryAlias: __scalar_sq_5 [a:Int64]",
- " Projection: Int64(1) AS a [a:Int64]",
- " EmptyRelation []",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- // TODO infer nullability in the schema has bug
- // // assert data
- // let results = execute_to_batches(&ctx, sql).await;
- // let expected = vec![
- // "+-------+--------+",
- // "| t1_id | t2_int |",
- // "+-------+--------+",
- // "| 22 | |",
- // "| 33 | |",
- // "| 11 | 1 |",
- // "| 44 | |",
- // "+-------+--------+",
- // ];
- // assert_batches_eq!(expected, &results);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn non_equal_correlated_scalar_subquery() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id < t1.t1_id) as t2_sum from t1";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let err = dataframe.into_optimized_plan().err().unwrap();
-
- assert_eq!(
- r#"Context("check_analyzed_plan", Plan("Correlated column is not allowed in predicate: t2.t2_id < outer_ref(t1.t1_id)"))"#,
- &format!("{err:?}")
- );
-
- Ok(())
-}
-
-#[tokio::test]
-async fn aggregated_correlated_scalar_subquery_with_extra_group_by_columns() -> Result<()>
-{
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id group by t2_name) as t2_sum from t1";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let err = dataframe.into_optimized_plan().err().unwrap();
-
- assert_eq!(
- r#"Context("check_analyzed_plan", Plan("A GROUP BY clause in a scalar correlated subquery cannot contain non-correlated columns"))"#,
- &format!("{err:?}")
- );
-
- Ok(())
-}
-
-#[tokio::test]
-async fn support_agg_correlated_columns() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT sum(t1.t1_int + t2.t2_id) FROM t2 WHERE t1.t1_name = t2.t2_name)";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- let expected = vec![
- "Filter: EXISTS (<subquery>) [t1_id:UInt32;N, t1_name:Utf8;N]",
- " Subquery: [SUM(outer_ref(t1.t1_int) + t2.t2_id):UInt64;N]",
- " Projection: SUM(outer_ref(t1.t1_int) + t2.t2_id) [SUM(outer_ref(t1.t1_int) + t2.t2_id):UInt64;N]",
- " Aggregate: groupBy=[[]], aggr=[[SUM(outer_ref(t1.t1_int) + t2.t2_id)]] [SUM(outer_ref(t1.t1_int) + t2.t2_id):UInt64;N]",
- " Filter: outer_ref(t1.t1_name) = t2.t2_name [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
- " TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
- " TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- Ok(())
-}
-
-#[tokio::test]
-async fn support_agg_correlated_columns2() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT count(*) FROM t2 WHERE t1.t1_name = t2.t2_name having sum(t1_int + t2_id) >0)";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- let expected = vec![
- "Filter: EXISTS (<subquery>) [t1_id:UInt32;N, t1_name:Utf8;N]",
- " Subquery: [COUNT(UInt8(1)):Int64;N]",
- " Projection: COUNT(UInt8(1)) [COUNT(UInt8(1)):Int64;N]",
- " Filter: CAST(SUM(outer_ref(t1.t1_int) + t2.t2_id) AS Int64) > Int64(0) [COUNT(UInt8(1)):Int64;N, SUM(outer_ref(t1.t1_int) + t2.t2_id):UInt64;N]",
- " Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)), SUM(outer_ref(t1.t1_int) + t2.t2_id)]] [COUNT(UInt8(1)):Int64;N, SUM(outer_ref(t1.t1_int) + t2.t2_id):UInt64;N]",
- " Filter: outer_ref(t1.t1_name) = t2.t2_name [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
- " TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
- " TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- Ok(())
-}
-
-#[tokio::test]
-async fn support_join_correlated_columns() -> Result<()> {
- let ctx = create_sub_query_join_context("t0_id", "t1_id", "t2_id", true)?;
- let sql = "SELECT t0_id, t0_name FROM t0 WHERE EXISTS (SELECT 1 FROM t1 INNER JOIN t2 ON(t1.t1_id = t2.t2_id and t1.t1_name = t0.t0_name))";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- let expected = vec![
- "Filter: EXISTS (<subquery>) [t0_id:UInt32;N, t0_name:Utf8;N]",
- " Subquery: [Int64(1):Int64]",
- " Projection: Int64(1) [Int64(1):Int64]",
- " Inner Join: Filter: t1.t1_id = t2.t2_id AND t1.t1_name = outer_ref(t0.t0_name) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
- " TableScan: t1 [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
- " TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
- " TableScan: t0 projection=[t0_id, t0_name] [t0_id:UInt32;N, t0_name:Utf8;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- Ok(())
-}
-
-#[tokio::test]
-async fn subquery_contains_join_contains_correlated_columns() -> Result<()> {
- let ctx = create_sub_query_join_context("t0_id", "t1_id", "t2_id", true)?;
- let sql = "SELECT t0_id, t0_name FROM t0 WHERE EXISTS (SELECT 1 FROM t1 INNER JOIN (select * from t2 where t2.t2_name = t0.t0_name) as t2 ON(t1.t1_id = t2.t2_id ))";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- let expected = vec![
- "LeftSemi Join: t0.t0_name = __correlated_sq_1.t2_name [t0_id:UInt32;N, t0_name:Utf8;N]",
- " TableScan: t0 projection=[t0_id, t0_name] [t0_id:UInt32;N, t0_name:Utf8;N]",
- " SubqueryAlias: __correlated_sq_1 [t2_name:Utf8;N]",
- " Projection: t2.t2_name [t2_name:Utf8;N]",
- " Inner Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N]",
- " TableScan: t1 projection=[t1_id] [t1_id:UInt32;N]",
- " SubqueryAlias: t2 [t2_id:UInt32;N, t2_name:Utf8;N]",
- " TableScan: t2 projection=[t2_id, t2_name] [t2_id:UInt32;N, t2_name:Utf8;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- Ok(())
-}
-
-#[tokio::test]
-async fn subquery_contains_join_contains_sub_query_alias_correlated_columns() -> Result<()>
-{
- let ctx = create_sub_query_join_context("t0_id", "t1_id", "t2_id", true)?;
- let sql = "SELECT t0_id, t0_name FROM t0 WHERE EXISTS (select 1 from (SELECT * FROM t1 where t1.t1_id = t0.t0_id) as x INNER JOIN (select * from t2 where t2.t2_name = t0.t0_name) as y ON(x.t1_id = y.t2_id))";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- let expected = vec![
- "LeftSemi Join: t0.t0_id = __correlated_sq_1.t1_id, t0.t0_name = __correlated_sq_1.t2_name [t0_id:UInt32;N, t0_name:Utf8;N]",
- " TableScan: t0 projection=[t0_id, t0_name] [t0_id:UInt32;N, t0_name:Utf8;N]",
- " SubqueryAlias: __correlated_sq_1 [t1_id:UInt32;N, t2_name:Utf8;N]",
- " Projection: x.t1_id, y.t2_name [t1_id:UInt32;N, t2_name:Utf8;N]",
- " Inner Join: x.t1_id = y.t2_id [t1_id:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N]",
- " SubqueryAlias: x [t1_id:UInt32;N]",
- " TableScan: t1 projection=[t1_id] [t1_id:UInt32;N]",
- " SubqueryAlias: y [t2_id:UInt32;N, t2_name:Utf8;N]",
- " TableScan: t2 projection=[t2_id, t2_name] [t2_id:UInt32;N, t2_name:Utf8;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- Ok(())
-}
-
-#[tokio::test]
-async fn support_order_by_correlated_columns() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT * FROM t2 WHERE t2_id >= t1_id order by t1_id)";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- let expected = vec![
- "Filter: EXISTS (<subquery>) [t1_id:UInt32;N, t1_name:Utf8;N]",
- " Subquery: [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
- " Sort: outer_ref(t1.t1_id) ASC NULLS LAST [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
- " Projection: t2.t2_id, t2.t2_name, t2.t2_int [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
- " Filter: t2.t2_id >= outer_ref(t1.t1_id) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
- " TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
- " TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- Ok(())
-}
-
-#[tokio::test]
-async fn exists_subquery_with_select_null() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT NULL)";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- let expected = vec![
- "Filter: EXISTS (<subquery>) [t1_id:UInt32;N, t1_name:Utf8;N]",
- " Subquery: [NULL:Null;N]",
- " Projection: NULL [NULL:Null;N]",
- " EmptyRelation []",
- " TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- Ok(())
-}
-
-#[tokio::test]
-async fn exists_subquery_with_limit() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT * FROM t2 WHERE t2_id = t1_id limit 1)";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- // de-correlated, limit is removed
- let expected = vec![
- "LeftSemi Join: t1.t1_id = __correlated_sq_1.t2_id [t1_id:UInt32;N, t1_name:Utf8;N]",
- " TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
- " SubqueryAlias: __correlated_sq_1 [t2_id:UInt32;N]",
- " TableScan: t2 projection=[t2_id] [t2_id:UInt32;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- // assert data
- let results = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+-------+---------+",
- "| t1_id | t1_name |",
- "+-------+---------+",
- "| 11 | a |",
- "| 22 | b |",
- "| 44 | d |",
- "+-------+---------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn exists_subquery_with_limit0() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT * FROM t2 WHERE t2_id = t1_id limit 0)";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- // de-correlated, limit is removed and replaced with EmptyRelation
- let expected = vec![
- "LeftSemi Join: t1.t1_id = __correlated_sq_1.t2_id [t1_id:UInt32;N, t1_name:Utf8;N]",
- " TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
- " EmptyRelation [t2_id:UInt32;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- // assert data
- let results = execute_to_batches(&ctx, sql).await;
- let expected = vec!["++", "++"];
- assert_batches_eq!(expected, &results);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn not_exists_subquery_with_limit0() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "SELECT t1_id, t1_name FROM t1 WHERE NOT EXISTS (SELECT * FROM t2 WHERE t2_id = t1_id limit 0)";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- // de-correlated, limit is removed and replaced with EmptyRelation
- let expected = vec![
- "LeftAnti Join: t1.t1_id = __correlated_sq_1.t2_id [t1_id:UInt32;N, t1_name:Utf8;N]",
- " TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
- " EmptyRelation [t2_id:UInt32;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- // assert data
- let results = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+-------+---------+",
- "| t1_id | t1_name |",
- "+-------+---------+",
- "| 11 | a |",
- "| 22 | b |",
- "| 33 | c |",
- "| 44 | d |",
- "+-------+---------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn in_correlated_subquery_with_limit() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "SELECT t1_id, t1_name FROM t1 WHERE t1_id in (SELECT t2_id FROM t2 where t1_name = t2_name limit 10)";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- // not de-correlated
- let expected = vec![
- "Filter: t1.t1_id IN (<subquery>) [t1_id:UInt32;N, t1_name:Utf8;N]",
- " Subquery: [t2_id:UInt32;N]",
- " Limit: skip=0, fetch=10 [t2_id:UInt32;N]",
- " Projection: t2.t2_id [t2_id:UInt32;N]",
- " Filter: outer_ref(t1.t1_name) = t2.t2_name [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
- " TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
- " TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- Ok(())
-}
-
-#[tokio::test]
-async fn in_non_correlated_subquery_with_limit() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql =
- "SELECT t1_id, t1_name FROM t1 WHERE t1_id in (SELECT t2_id FROM t2 limit 10)";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- // de-correlated, limit is kept
- let expected = vec![
- "LeftSemi Join: t1.t1_id = __correlated_sq_1.t2_id [t1_id:UInt32;N, t1_name:Utf8;N]",
- " TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
- " SubqueryAlias: __correlated_sq_1 [t2_id:UInt32;N]",
- " Limit: skip=0, fetch=10 [t2_id:UInt32;N]",
- " TableScan: t2 projection=[t2_id], fetch=10 [t2_id:UInt32;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- Ok(())
-}
-
-#[tokio::test]
-async fn uncorrelated_scalar_subquery_with_limit0() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "SELECT t1_id, (SELECT t2_id FROM t2 limit 0) FROM t1";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- let expected = vec![
- "Projection: t1.t1_id, __scalar_sq_1.t2_id AS t2_id [t1_id:UInt32;N, t2_id:UInt32;N]",
- " Left Join: [t1_id:UInt32;N, t2_id:UInt32;N]",
- " TableScan: t1 projection=[t1_id] [t1_id:UInt32;N]",
- " EmptyRelation [t2_id:UInt32;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- // assert data
- let results = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+-------+-------+",
- "| t1_id | t2_id |",
- "+-------+-------+",
- "| 11 | |",
- "| 22 | |",
- "| 33 | |",
- "| 44 | |",
- "+-------+-------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn support_union_subquery() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "SELECT t1_id, t1_name FROM t1 WHERE EXISTS \
- (SELECT * FROM t2 WHERE t2_id = t1_id UNION ALL \
- SELECT * FROM t2 WHERE upper(t2_name) = upper(t1.t1_name))";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- let expected = vec![
- "Filter: EXISTS (<subquery>) [t1_id:UInt32;N, t1_name:Utf8;N]",
- " Subquery: [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
- " Union [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
- " Projection: t2.t2_id, t2.t2_name, t2.t2_int [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
- " Filter: t2.t2_id = outer_ref(t1.t1_id) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
- " TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
- " Projection: t2.t2_id, t2.t2_name, t2.t2_int [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
- " Filter: upper(t2.t2_name) = upper(outer_ref(t1.t1_name)) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
- " TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
- " TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- Ok(())
-}
-
-#[tokio::test]
-async fn simple_uncorrelated_scalar_subquery() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "select (select count(*) from t1) as b";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- let expected = vec![
- "Projection: __scalar_sq_1.COUNT(UInt8(1)) AS b [b:Int64;N]",
- " SubqueryAlias: __scalar_sq_1 [COUNT(UInt8(1)):Int64;N]",
- " Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]] [COUNT(UInt8(1)):Int64;N]",
- " TableScan: t1 projection=[t1_id] [t1_id:UInt32;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- // assert data
- let results = execute_to_batches(&ctx, sql).await;
- let expected = vec!["+---+", "| b |", "+---+", "| 4 |", "+---+"];
- assert_batches_eq!(expected, &results);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn simple_uncorrelated_scalar_subquery2() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "select (select count(*) from t1) as b, (select count(1) from t2)";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- let expected = vec![
- "Projection: __scalar_sq_1.COUNT(UInt8(1)) AS b, __scalar_sq_2.COUNT(Int64(1)) AS COUNT(Int64(1)) [b:Int64;N, COUNT(Int64(1)):Int64;N]",
- " Left Join: [COUNT(UInt8(1)):Int64;N, COUNT(Int64(1)):Int64;N]",
- " SubqueryAlias: __scalar_sq_1 [COUNT(UInt8(1)):Int64;N]",
- " Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]] [COUNT(UInt8(1)):Int64;N]",
- " TableScan: t1 projection=[t1_id] [t1_id:UInt32;N]",
- " SubqueryAlias: __scalar_sq_2 [COUNT(Int64(1)):Int64;N]",
- " Aggregate: groupBy=[[]], aggr=[[COUNT(Int64(1))]] [COUNT(Int64(1)):Int64;N]",
- " TableScan: t2 projection=[t2_id] [t2_id:UInt32;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- // assert data
- let results = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+---+-----------------+",
- "| b | COUNT(Int64(1)) |",
- "+---+-----------------+",
- "| 4 | 4 |",
- "+---+-----------------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn correlated_scalar_subquery_count_agg() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql =
- "SELECT t1_id, (SELECT count(*) FROM t2 WHERE t2.t2_int = t1.t1_int) from t1";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- let expected = vec![
- "Projection: t1.t1_id, CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_1.COUNT(UInt8(1)) END AS COUNT(UInt8(1)) [t1_id:UInt32;N, COUNT(UInt8(1)):Int64;N]",
- " Left Join: t1.t1_int = __scalar_sq_1.t2_int [t1_id:UInt32;N, t1_int:UInt32;N, COUNT(UInt8(1)):Int64;N, t2_int:UInt32;N, __always_true:Boolean;N]",
- " TableScan: t1 projection=[t1_id, t1_int] [t1_id:UInt32;N, t1_int:UInt32;N]",
- " SubqueryAlias: __scalar_sq_1 [COUNT(UInt8(1)):Int64;N, t2_int:UInt32;N, __always_true:Boolean]",
- " Projection: COUNT(UInt8(1)), t2.t2_int, __always_true [COUNT(UInt8(1)):Int64;N, t2_int:UInt32;N, __always_true:Boolean]",
- " Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]] [t2_int:UInt32;N, __always_true:Boolean, COUNT(UInt8(1)):Int64;N]",
- " TableScan: t2 projection=[t2_int] [t2_int:UInt32;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- // assert data
- let results = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+-------+-----------------+",
- "| t1_id | COUNT(UInt8(1)) |",
- "+-------+-----------------+",
- "| 33 | 3 |",
- "| 22 | 0 |",
- "| 11 | 1 |",
- "| 44 | 0 |",
- "+-------+-----------------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn correlated_scalar_subquery_count_agg2() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "SELECT t1_id, (SELECT count(*) FROM t2 WHERE t2.t2_int = t1.t1_int) as cnt from t1";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- let expected = vec![
- "Projection: t1.t1_id, CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_1.COUNT(UInt8(1)) END AS cnt [t1_id:UInt32;N, cnt:Int64;N]",
- " Left Join: t1.t1_int = __scalar_sq_1.t2_int [t1_id:UInt32;N, t1_int:UInt32;N, COUNT(UInt8(1)):Int64;N, t2_int:UInt32;N, __always_true:Boolean;N]",
- " TableScan: t1 projection=[t1_id, t1_int] [t1_id:UInt32;N, t1_int:UInt32;N]",
- " SubqueryAlias: __scalar_sq_1 [COUNT(UInt8(1)):Int64;N, t2_int:UInt32;N, __always_true:Boolean]",
- " Projection: COUNT(UInt8(1)), t2.t2_int, __always_true [COUNT(UInt8(1)):Int64;N, t2_int:UInt32;N, __always_true:Boolean]",
- " Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]] [t2_int:UInt32;N, __always_true:Boolean, COUNT(UInt8(1)):Int64;N]",
- " TableScan: t2 projection=[t2_int] [t2_int:UInt32;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- // assert data
- let results = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+-------+-----+",
- "| t1_id | cnt |",
- "+-------+-----+",
- "| 33 | 3 |",
- "| 22 | 0 |",
- "| 11 | 1 |",
- "| 44 | 0 |",
- "+-------+-----+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn correlated_scalar_subquery_count_agg_with_alias() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "SELECT t1_id, (SELECT count(*) as _cnt FROM t2 WHERE t2.t2_int = t1.t1_int) as cnt from t1";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- let expected = vec![
- "Projection: t1.t1_id, CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) AS _cnt ELSE __scalar_sq_1._cnt END AS cnt [t1_id:UInt32;N, cnt:Int64;N]",
- " Left Join: t1.t1_int = __scalar_sq_1.t2_int [t1_id:UInt32;N, t1_int:UInt32;N, _cnt:Int64;N, t2_int:UInt32;N, __always_true:Boolean;N]",
- " TableScan: t1 projection=[t1_id, t1_int] [t1_id:UInt32;N, t1_int:UInt32;N]",
- " SubqueryAlias: __scalar_sq_1 [_cnt:Int64;N, t2_int:UInt32;N, __always_true:Boolean]",
- " Projection: COUNT(UInt8(1)) AS _cnt, t2.t2_int, __always_true [_cnt:Int64;N, t2_int:UInt32;N, __always_true:Boolean]",
- " Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]] [t2_int:UInt32;N, __always_true:Boolean, COUNT(UInt8(1)):Int64;N]",
- " TableScan: t2 projection=[t2_int] [t2_int:UInt32;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- // assert data
- let results = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+-------+-----+",
- "| t1_id | cnt |",
- "+-------+-----+",
- "| 33 | 3 |",
- "| 22 | 0 |",
- "| 11 | 1 |",
- "| 44 | 0 |",
- "+-------+-----+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn correlated_scalar_subquery_count_agg_complex_expr() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "SELECT t1_id, (SELECT count(*) + 2 as _cnt FROM t2 WHERE t2.t2_int = t1.t1_int) from t1";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- let expected = vec![
- "Projection: t1.t1_id, CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(2) AS _cnt ELSE __scalar_sq_1._cnt END AS _cnt [t1_id:UInt32;N, _cnt:Int64;N]",
- " Left Join: t1.t1_int = __scalar_sq_1.t2_int [t1_id:UInt32;N, t1_int:UInt32;N, _cnt:Int64;N, t2_int:UInt32;N, __always_true:Boolean;N]",
- " TableScan: t1 projection=[t1_id, t1_int] [t1_id:UInt32;N, t1_int:UInt32;N]",
- " SubqueryAlias: __scalar_sq_1 [_cnt:Int64;N, t2_int:UInt32;N, __always_true:Boolean]",
- " Projection: COUNT(UInt8(1)) + Int64(2) AS _cnt, t2.t2_int, __always_true [_cnt:Int64;N, t2_int:UInt32;N, __always_true:Boolean]",
- " Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]] [t2_int:UInt32;N, __always_true:Boolean, COUNT(UInt8(1)):Int64;N]",
- " TableScan: t2 projection=[t2_int] [t2_int:UInt32;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- // assert data
- let results = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+-------+------+",
- "| t1_id | _cnt |",
- "+-------+------+",
- "| 11 | 3 |",
- "| 22 | 2 |",
- "| 33 | 5 |",
- "| 44 | 2 |",
- "+-------+------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn correlated_scalar_subquery_count_agg_where_clause() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "select t1.t1_int from t1 where (select count(*) from t2 where t1.t1_id = t2.t2_id) < t1.t1_int";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- let expected = vec![
- "Projection: t1.t1_int [t1_int:UInt32;N]",
- " Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_1.COUNT(UInt8(1)) END < CAST(t1.t1_int AS Int64) [t1_int:UInt32;N, COUNT(UInt8(1)):Int64;N, __always_true:Boolean;N]",
- " Projection: t1.t1_int, __scalar_sq_1.COUNT(UInt8(1)), __scalar_sq_1.__always_true [t1_int:UInt32;N, COUNT(UInt8(1)):Int64;N, __always_true:Boolean;N]",
- " Left Join: t1.t1_id = __scalar_sq_1.t2_id [t1_id:UInt32;N, t1_int:UInt32;N, COUNT(UInt8(1)):Int64;N, t2_id:UInt32;N, __always_true:Boolean;N]",
- " TableScan: t1 projection=[t1_id, t1_int] [t1_id:UInt32;N, t1_int:UInt32;N]",
- " SubqueryAlias: __scalar_sq_1 [COUNT(UInt8(1)):Int64;N, t2_id:UInt32;N, __always_true:Boolean]",
- " Projection: COUNT(UInt8(1)), t2.t2_id, __always_true [COUNT(UInt8(1)):Int64;N, t2_id:UInt32;N, __always_true:Boolean]",
- " Aggregate: groupBy=[[t2.t2_id, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]] [t2_id:UInt32;N, __always_true:Boolean, COUNT(UInt8(1)):Int64;N]",
- " TableScan: t2 projection=[t2_id] [t2_id:UInt32;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- // assert data
- let results = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+--------+",
- "| t1_int |",
- "+--------+",
- "| 2 |",
- "| 4 |",
- "| 3 |",
- "+--------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
-}
#[tokio::test]
#[ignore]
@@ -1164,272 +61,3 @@ async fn correlated_scalar_subquery_sum_agg_bug() -> Result<()> {
Ok(())
}
-
-#[tokio::test]
-async fn correlated_scalar_subquery_count_agg_with_having() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "SELECT t1_id, (SELECT count(*) + 2 as cnt_plus_2 FROM t2 WHERE t2.t2_int = t1.t1_int having count(*) >1) from t1";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- // the having condition is kept as the normal filter condition, no need to pull up
- let expected = vec![
- "Projection: t1.t1_id, __scalar_sq_1.cnt_plus_2 AS cnt_plus_2 [t1_id:UInt32;N, cnt_plus_2:Int64;N]",
- " Left Join: t1.t1_int = __scalar_sq_1.t2_int [t1_id:UInt32;N, t1_int:UInt32;N, cnt_plus_2:Int64;N, t2_int:UInt32;N]",
- " TableScan: t1 projection=[t1_id, t1_int] [t1_id:UInt32;N, t1_int:UInt32;N]",
- " SubqueryAlias: __scalar_sq_1 [cnt_plus_2:Int64;N, t2_int:UInt32;N]",
- " Projection: COUNT(UInt8(1)) + Int64(2) AS cnt_plus_2, t2.t2_int [cnt_plus_2:Int64;N, t2_int:UInt32;N]",
- " Filter: COUNT(UInt8(1)) > Int64(1) [t2_int:UInt32;N, COUNT(UInt8(1)):Int64;N]",
- " Projection: t2.t2_int, COUNT(UInt8(1)) [t2_int:UInt32;N, COUNT(UInt8(1)):Int64;N]",
- " Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]] [t2_int:UInt32;N, __always_true:Boolean, COUNT(UInt8(1)):Int64;N]",
- " TableScan: t2 projection=[t2_int] [t2_int:UInt32;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- // assert data
- let results = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+-------+------------+",
- "| t1_id | cnt_plus_2 |",
- "+-------+------------+",
- "| 11 | |",
- "| 22 | |",
- "| 33 | 5 |",
- "| 44 | |",
- "+-------+------------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn correlated_scalar_subquery_count_agg_with_pull_up_having() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "SELECT t1_id, (SELECT count(*) + 2 as cnt_plus_2 FROM t2 WHERE t2.t2_int = t1.t1_int having count(*) = 0) from t1";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- // the having condition need to pull up and evaluated after the left out join
- let expected = vec![
- "Projection: t1.t1_id, CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(2) AS cnt_plus_2 WHEN __scalar_sq_1.COUNT(UInt8(1)) != Int64(0) THEN NULL ELSE __scalar_sq_1.cnt_plus_2 END AS cnt_plus_2 [t1_id:UInt32;N, cnt_plus_2:Int64;N]",
- " Left Join: t1.t1_int = __scalar_sq_1.t2_int [t1_id:UInt32;N, t1_int:UInt32;N, cnt_plus_2:Int64;N, t2_int:UInt32;N, COUNT(UInt8(1)):Int64;N, __always_true:Boolean;N]",
- " TableScan: t1 projection=[t1_id, t1_int] [t1_id:UInt32;N, t1_int:UInt32;N]",
- " SubqueryAlias: __scalar_sq_1 [cnt_plus_2:Int64;N, t2_int:UInt32;N, COUNT(UInt8(1)):Int64;N, __always_true:Boolean]",
- " Projection: COUNT(UInt8(1)) + Int64(2) AS cnt_plus_2, t2.t2_int, COUNT(UInt8(1)), __always_true [cnt_plus_2:Int64;N, t2_int:UInt32;N, COUNT(UInt8(1)):Int64;N, __always_true:Boolean]",
- " Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]] [t2_int:UInt32;N, __always_true:Boolean, COUNT(UInt8(1)):Int64;N]",
- " TableScan: t2 projection=[t2_int] [t2_int:UInt32;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- // assert data
- let results = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+-------+------------+",
- "| t1_id | cnt_plus_2 |",
- "+-------+------------+",
- "| 11 | |",
- "| 22 | 2 |",
- "| 33 | |",
- "| 44 | 2 |",
- "+-------+------------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn correlated_scalar_subquery_count_agg_in_having() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "select t1.t1_int from t1 group by t1.t1_int having (select count(*) from t2 where t1.t1_int = t2.t2_int) = 0";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- let expected = vec![
- "Projection: t1.t1_int [t1_int:UInt32;N]",
- " Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_1.COUNT(UInt8(1)) END = Int64(0) [t1_int:UInt32;N, COUNT(UInt8(1)):Int64;N, __always_true:Boolean;N]",
- " Projection: t1.t1_int, __scalar_sq_1.COUNT(UInt8(1)), __scalar_sq_1.__always_true [t1_int:UInt32;N, COUNT(UInt8(1)):Int64;N, __always_true:Boolean;N]",
- " Left Join: t1.t1_int = __scalar_sq_1.t2_int [t1_int:UInt32;N, COUNT(UInt8(1)):Int64;N, t2_int:UInt32;N, __always_true:Boolean;N]",
- " Aggregate: groupBy=[[t1.t1_int]], aggr=[[]] [t1_int:UInt32;N]",
- " TableScan: t1 projection=[t1_int] [t1_int:UInt32;N]",
- " SubqueryAlias: __scalar_sq_1 [COUNT(UInt8(1)):Int64;N, t2_int:UInt32;N, __always_true:Boolean]",
- " Projection: COUNT(UInt8(1)), t2.t2_int, __always_true [COUNT(UInt8(1)):Int64;N, t2_int:UInt32;N, __always_true:Boolean]",
- " Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]] [t2_int:UInt32;N, __always_true:Boolean, COUNT(UInt8(1)):Int64;N]",
- " TableScan: t2 projection=[t2_int] [t2_int:UInt32;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- // assert data
- let results = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+--------+",
- "| t1_int |",
- "+--------+",
- "| 2 |",
- "| 4 |",
- "+--------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn correlated_scalar_subquery_count_agg_in_nested_projection() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "select t1.t1_int from t1 where (select cnt from (select count(*) as cnt, sum(t2_int) from t2 where t1.t1_int = t2.t2_int)) = 0";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- let expected = vec![
- "Projection: t1.t1_int [t1_int:UInt32;N]",
- " Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_1.cnt END = Int64(0) [t1_int:UInt32;N, cnt:Int64;N, __always_true:Boolean;N]",
- " Projection: t1.t1_int, __scalar_sq_1.cnt, __scalar_sq_1.__always_true [t1_int:UInt32;N, cnt:Int64;N, __always_true:Boolean;N]",
- " Left Join: t1.t1_int = __scalar_sq_1.t2_int [t1_int:UInt32;N, cnt:Int64;N, t2_int:UInt32;N, __always_true:Boolean;N]",
- " TableScan: t1 projection=[t1_int] [t1_int:UInt32;N]",
- " SubqueryAlias: __scalar_sq_1 [cnt:Int64;N, t2_int:UInt32;N, __always_true:Boolean]",
- " Projection: COUNT(UInt8(1)) AS cnt, t2.t2_int, __always_true [cnt:Int64;N, t2_int:UInt32;N, __always_true:Boolean]",
- " Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]] [t2_int:UInt32;N, __always_true:Boolean, COUNT(UInt8(1)):Int64;N]",
- " TableScan: t2 projection=[t2_int] [t2_int:UInt32;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- // assert data
- let results = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+--------+",
- "| t1_int |",
- "+--------+",
- "| 2 |",
- "| 4 |",
- "+--------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn correlated_scalar_subquery_count_agg_in_nested_subquery() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "select t1.t1_int from t1 where \
- (select cnt_plus_one + 1 as cnt_plus_two from \
- (select cnt + 1 as cnt_plus_one from \
- (select count(*) as cnt, sum(t2_int) s from t2 where t1.t1_int = t2.t2_int having cnt = 0)\
- )\
- ) = 2";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- // pull up the deeply nested having condition
- let expected = vec![
- "Projection: t1.t1_int [t1_int:UInt32;N]",
- " Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(2) WHEN __scalar_sq_1.COUNT(UInt8(1)) != Int64(0) THEN NULL ELSE __scalar_sq_1.cnt_plus_two END = Int64(2) [t1_int:UInt32;N, cnt_plus_two:Int64;N, COUNT(UInt8(1)):Int64;N, __always_true:Boolean;N]",
- " Projection: t1.t1_int, __scalar_sq_1.cnt_plus_two, __scalar_sq_1.COUNT(UInt8(1)), __scalar_sq_1.__always_true [t1_int:UInt32;N, cnt_plus_two:Int64;N, COUNT(UInt8(1)):Int64;N, __always_true:Boolean;N]",
- " Left Join: t1.t1_int = __scalar_sq_1.t2_int [t1_int:UInt32;N, cnt_plus_two:Int64;N, t2_int:UInt32;N, COUNT(UInt8(1)):Int64;N, __always_true:Boolean;N]",
- " TableScan: t1 projection=[t1_int] [t1_int:UInt32;N]",
- " SubqueryAlias: __scalar_sq_1 [cnt_plus_two:Int64;N, t2_int:UInt32;N, COUNT(UInt8(1)):Int64;N, __always_true:Boolean]",
- " Projection: COUNT(UInt8(1)) + Int64(1) + Int64(1) AS cnt_plus_two, t2.t2_int, COUNT(UInt8(1)), __always_true [cnt_plus_two:Int64;N, t2_int:UInt32;N, COUNT(UInt8(1)):Int64;N, __always_true:Boolean]",
- " Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]] [t2_int:UInt32;N, __always_true:Boolean, COUNT(UInt8(1)):Int64;N]",
- " TableScan: t2 projection=[t2_int] [t2_int:UInt32;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- // assert data
- let results = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+--------+",
- "| t1_int |",
- "+--------+",
- "| 2 |",
- "| 4 |",
- "+--------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn correlated_scalar_subquery_count_agg_in_case_when() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "select t1.t1_int from t1 where \
- (select case when count(*) = 1 then null else count(*) end as cnt from t2 where t2.t2_int = t1.t1_int)\
- = 0";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- let expected = vec![
- "Projection: t1.t1_int [t1_int:UInt32;N]",
- " Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_1.cnt END = Int64(0) [t1_int:UInt32;N, cnt:Int64;N, __always_true:Boolean;N]",
- " Projection: t1.t1_int, __scalar_sq_1.cnt, __scalar_sq_1.__always_true [t1_int:UInt32;N, cnt:Int64;N, __always_true:Boolean;N]",
- " Left Join: t1.t1_int = __scalar_sq_1.t2_int [t1_int:UInt32;N, cnt:Int64;N, t2_int:UInt32;N, __always_true:Boolean;N]",
- " TableScan: t1 projection=[t1_int] [t1_int:UInt32;N]",
- " SubqueryAlias: __scalar_sq_1 [cnt:Int64;N, t2_int:UInt32;N, __always_true:Boolean]",
- " Projection: CASE WHEN COUNT(UInt8(1)) = Int64(1) THEN Int64(NULL) ELSE COUNT(UInt8(1)) END AS cnt, t2.t2_int, __always_true [cnt:Int64;N, t2_int:UInt32;N, __always_true:Boolean]",
- " Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]] [t2_int:UInt32;N, __always_true:Boolean, COUNT(UInt8(1)):Int64;N]",
- " TableScan: t2 projection=[t2_int] [t2_int:UInt32;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- // assert data
- let results = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+--------+",
- "| t1_int |",
- "+--------+",
- "| 2 |",
- "| 4 |",
- "+--------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
-}
diff --git a/datafusion/core/tests/sqllogictests/test_files/subquery.slt b/datafusion/core/tests/sqllogictests/test_files/subquery.slt
index b47901b277..1961a5840d 100644
--- a/datafusion/core/tests/sqllogictests/test_files/subquery.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/subquery.slt
@@ -19,7 +19,18 @@
## Subquery Tests
#############
-# two tables for subquery
+
+#############
+## Setup test data table
+#############
+# there tables for subquery
+statement ok
+CREATE TABLE t0(t0_id INT, t0_name TEXT, t0_int INT) AS VALUES
+(11, 'o', 6),
+(22, 'p', 7),
+(33, 'q', 8),
+(44, 'r', 9);
+
statement ok
CREATE TABLE t1(t1_id INT, t1_name TEXT, t1_int INT) AS VALUES
(11, 'a', 1),
@@ -34,6 +45,50 @@ CREATE TABLE t2(t2_id INT, t2_name TEXT, t2_int INT) AS VALUES
(44, 'x', 3),
(55, 'w', 3);
+statement ok
+CREATE EXTERNAL TABLE IF NOT EXISTS customer (
+ c_custkey BIGINT,
+ c_name VARCHAR,
+ c_address VARCHAR,
+ c_nationkey BIGINT,
+ c_phone VARCHAR,
+ c_acctbal DECIMAL(15, 2),
+ c_mktsegment VARCHAR,
+ c_comment VARCHAR,
+) STORED AS CSV DELIMITER ',' WITH HEADER ROW LOCATION 'tests/tpch-csv/customer.csv';
+
+statement ok
+CREATE EXTERNAL TABLE IF NOT EXISTS orders (
+ o_orderkey BIGINT,
+ o_custkey BIGINT,
+ o_orderstatus VARCHAR,
+ o_totalprice DECIMAL(15, 2),
+ o_orderdate DATE,
+ o_orderpriority VARCHAR,
+ o_clerk VARCHAR,
+ o_shippriority INTEGER,
+ o_comment VARCHAR,
+) STORED AS CSV DELIMITER ',' WITH HEADER ROW LOCATION 'tests/tpch-csv/orders.csv';
+
+statement ok
+CREATE EXTERNAL TABLE IF NOT EXISTS lineitem (
+ l_orderkey BIGINT,
+ l_partkey BIGINT,
+ l_suppkey BIGINT,
+ l_linenumber INTEGER,
+ l_quantity DECIMAL(15, 2),
+ l_extendedprice DECIMAL(15, 2),
+ l_discount DECIMAL(15, 2),
+ l_tax DECIMAL(15, 2),
+ l_returnflag VARCHAR,
+ l_linestatus VARCHAR,
+ l_shipdate DATE,
+ l_commitdate DATE,
+ l_receiptdate DATE,
+ l_shipinstruct VARCHAR,
+ l_shipmode VARCHAR,
+ l_comment VARCHAR,
+) STORED AS CSV DELIMITER ',' WITH HEADER ROW LOCATION 'tests/tpch-csv/lineitem.csv';
# in_subquery_to_join_with_correlated_outer_filter
query ITI rowsort
@@ -248,3 +303,670 @@ SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id having sum(t
22 1
33 NULL
44 NULL
+
+
+statement ok
+set datafusion.explain.logical_plan_only = true;
+
+# correlated_recursive_scalar_subquery
+query TT
+explain select c_custkey from customer
+where c_acctbal < (
+ select sum(o_totalprice) from orders
+ where o_custkey = c_custkey
+ and o_totalprice < (
+ select sum(l_extendedprice) as price from lineitem where l_orderkey = o_orderkey
+ )
+) order by c_custkey;
+----
+logical_plan
+Sort: customer.c_custkey ASC NULLS LAST
+--Projection: customer.c_custkey
+----Inner Join: customer.c_custkey = __scalar_sq_10.o_custkey Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_10.SUM(orders.o_totalprice)
+------TableScan: customer projection=[c_custkey, c_acctbal]
+------SubqueryAlias: __scalar_sq_10
+--------Projection: SUM(orders.o_totalprice), orders.o_custkey
+----------Aggregate: groupBy=[[orders.o_custkey]], aggr=[[SUM(orders.o_totalprice)]]
+------------Projection: orders.o_custkey, orders.o_totalprice
+--------------Inner Join: orders.o_orderkey = __scalar_sq_11.l_orderkey Filter: CAST(orders.o_totalprice AS Decimal128(25, 2)) < __scalar_sq_11.price
+----------------TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice]
+----------------SubqueryAlias: __scalar_sq_11
+------------------Projection: SUM(lineitem.l_extendedprice) AS price, lineitem.l_orderkey
+--------------------Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_extendedprice)]]
+----------------------TableScan: lineitem projection=[l_orderkey, l_extendedprice]
+
+# correlated_where_in
+query TT
+explain select o_orderkey from orders
+where o_orderstatus in (
+ select l_linestatus from lineitem where l_orderkey = orders.o_orderkey
+);
+----
+logical_plan
+Projection: orders.o_orderkey
+--LeftSemi Join: orders.o_orderstatus = __correlated_sq_6.l_linestatus, orders.o_orderkey = __correlated_sq_6.l_orderkey
+----TableScan: orders projection=[o_orderkey, o_orderstatus]
+----SubqueryAlias: __correlated_sq_6
+------Projection: lineitem.l_linestatus, lineitem.l_orderkey
+--------TableScan: lineitem projection=[l_orderkey, l_linestatus]
+
+query I rowsort
+select o_orderkey from orders
+where o_orderstatus in (
+ select l_linestatus from lineitem where l_orderkey = orders.o_orderkey
+);
+----
+2
+3
+
+#exists_subquery_with_same_table
+#Subquery and outer query refer to the same table.
+#It will not be rewritten to join because it is not a correlated subquery.
+query TT
+explain SELECT t1_id, t1_name, t1_int FROM t1 WHERE EXISTS(SELECT t1_int FROM t1 WHERE t1.t1_id > t1.t1_int)
+----
+logical_plan
+Filter: EXISTS (<subquery>)
+--Subquery:
+----Projection: t1.t1_int
+------Filter: t1.t1_id > t1.t1_int
+--------TableScan: t1
+--TableScan: t1 projection=[t1_id, t1_name, t1_int]
+
+
+#in_subquery_with_same_table
+#Subquery and outer query refer to the same table.
+#It will be rewritten to join because in-subquery has extra predicate(`t1.t1_id = __correlated_sq_10.t1_int`).
+query TT
+explain SELECT t1_id, t1_name, t1_int FROM t1 WHERE t1_id IN(SELECT t1_int FROM t1 WHERE t1.t1_id > t1.t1_int)
+----
+logical_plan
+LeftSemi Join: t1.t1_id = __correlated_sq_10.t1_int
+--TableScan: t1 projection=[t1_id, t1_name, t1_int]
+--SubqueryAlias: __correlated_sq_10
+----Projection: t1.t1_int
+------Filter: t1.t1_id > t1.t1_int
+--------TableScan: t1 projection=[t1_id, t1_int]
+
+#in_subquery_nested_exist_subquery
+query TT
+explain SELECT t1_id, t1_name, t1_int FROM t1 WHERE t1_id IN(SELECT t2_id FROM t2 WHERE EXISTS(select * from t1 WHERE t1.t1_int > t2.t2_int))
+----
+logical_plan
+LeftSemi Join: t1.t1_id = __correlated_sq_11.t2_id
+--TableScan: t1 projection=[t1_id, t1_name, t1_int]
+--SubqueryAlias: __correlated_sq_11
+----Projection: t2.t2_id
+------LeftSemi Join: Filter: __correlated_sq_12.t1_int > t2.t2_int
+--------TableScan: t2 projection=[t2_id, t2_int]
+--------SubqueryAlias: __correlated_sq_12
+----------TableScan: t1 projection=[t1_int]
+
+#invalid_scalar_subquery
+statement error DataFusion error: check_analyzed_plan\ncaused by\nError during planning: Scalar subquery should only return one column, but found 2: t2.t2_id, t2.t2_name
+SELECT t1_id, t1_name, t1_int, (select t2_id, t2_name FROM t2 WHERE t2.t2_id = t1.t1_int) FROM t1
+
+#subquery_not_allowed
+#In/Exist Subquery is not allowed in ORDER BY clause.
+statement error DataFusion error: check_analyzed_plan\ncaused by\nError during planning: In/Exist subquery can only be used in Projection, Filter, Window functions, Aggregate and Join plan nodes
+SELECT t1_id, t1_name, t1_int FROM t1 order by t1_int in (SELECT t2_int FROM t2 WHERE t1.t1_id > t1.t1_int)
+
+#non_aggregated_correlated_scalar_subquery
+statement error DataFusion error: check_analyzed_plan\ncaused by\nError during planning: Correlated scalar subquery must be aggregated to return at most one row
+SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1.t1_int) as t2_int from t1
+
+statement error DataFusion error: check_analyzed_plan\ncaused by\nError during planning: Correlated scalar subquery must be aggregated to return at most one row
+SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1_int group by t2_int) as t2_int from t1
+
+#non_aggregated_correlated_scalar_subquery_with_limit
+statement error DataFusion error: check_analyzed_plan\ncaused by\nError during planning: Correlated scalar subquery must be aggregated to return at most one row
+SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1.t1_int limit 2) as t2_int from t1
+
+#non_aggregated_correlated_scalar_subquery_with_single_row
+query TT
+explain SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1.t1_int limit 1) as t2_int from t1
+----
+logical_plan
+Projection: t1.t1_id, (<subquery>) AS t2_int
+--Subquery:
+----Limit: skip=0, fetch=1
+------Projection: t2.t2_int
+--------Filter: t2.t2_int = outer_ref(t1.t1_int)
+----------TableScan: t2
+--TableScan: t1 projection=[t1_id]
+
+query TT
+explain SELECT t1_id from t1 where t1_int = (SELECT t2_int FROM t2 WHERE t2.t2_int = t1.t1_int limit 1)
+----
+logical_plan
+Projection: t1.t1_id
+--Filter: t1.t1_int = (<subquery>)
+----Subquery:
+------Limit: skip=0, fetch=1
+--------Projection: t2.t2_int
+----------Filter: t2.t2_int = outer_ref(t1.t1_int)
+------------TableScan: t2
+----TableScan: t1 projection=[t1_id, t1_int]
+
+query TT
+explain SELECT t1_id, (SELECT a FROM (select 1 as a) WHERE a = t1.t1_int) as t2_int from t1
+----
+logical_plan
+Projection: t1.t1_id, __scalar_sq_16.a AS t2_int
+--Left Join: CAST(t1.t1_int AS Int64) = __scalar_sq_16.a
+----TableScan: t1 projection=[t1_id, t1_int]
+----SubqueryAlias: __scalar_sq_16
+------Projection: Int64(1) AS a
+--------EmptyRelation
+
+query II rowsort
+SELECT t1_id, (SELECT a FROM (select 1 as a) WHERE a = t1.t1_int) as t2_int from t1
+----
+11 1
+22 NULL
+33 NULL
+44 NULL
+
+#non_equal_correlated_scalar_subquery
+statement error DataFusion error: check_analyzed_plan\ncaused by\nError during planning: Correlated column is not allowed in predicate: t2\.t2_id < outer_ref\(t1\.t1_id\)
+SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id < t1.t1_id) as t2_sum from t1
+
+#aggregated_correlated_scalar_subquery_with_extra_group_by_columns
+statement error DataFusion error: check_analyzed_plan\ncaused by\nError during planning: A GROUP BY clause in a scalar correlated subquery cannot contain non-correlated columns
+SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id group by t2_name) as t2_sum from t1
+
+#support_agg_correlated_columns
+query TT
+explain SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT sum(t1.t1_int + t2.t2_id) FROM t2 WHERE t1.t1_name = t2.t2_name)
+----
+logical_plan
+Filter: EXISTS (<subquery>)
+--Subquery:
+----Projection: SUM(outer_ref(t1.t1_int) + t2.t2_id)
+------Aggregate: groupBy=[[]], aggr=[[SUM(outer_ref(t1.t1_int) + t2.t2_id)]]
+--------Filter: outer_ref(t1.t1_name) = t2.t2_name
+----------TableScan: t2
+--TableScan: t1 projection=[t1_id, t1_name]
+
+#support_agg_correlated_columns2
+query TT
+explain SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT count(*) FROM t2 WHERE t1.t1_name = t2.t2_name having sum(t1_int + t2_id) >0)
+----
+logical_plan
+Filter: EXISTS (<subquery>)
+--Subquery:
+----Projection: COUNT(UInt8(1))
+------Filter: SUM(outer_ref(t1.t1_int) + t2.t2_id) > Int64(0)
+--------Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)), SUM(outer_ref(t1.t1_int) + t2.t2_id)]]
+----------Filter: outer_ref(t1.t1_name) = t2.t2_name
+------------TableScan: t2
+--TableScan: t1 projection=[t1_id, t1_name]
+
+#support_join_correlated_columns
+query TT
+explain SELECT t0_id, t0_name FROM t0 WHERE EXISTS (SELECT 1 FROM t1 INNER JOIN t2 ON(t1.t1_id = t2.t2_id and t1.t1_name = t0.t0_name))
+----
+logical_plan
+Filter: EXISTS (<subquery>)
+--Subquery:
+----Projection: Int64(1)
+------Inner Join: Filter: t1.t1_id = t2.t2_id AND t1.t1_name = outer_ref(t0.t0_name)
+--------TableScan: t1
+--------TableScan: t2
+--TableScan: t0 projection=[t0_id, t0_name]
+
+#subquery_contains_join_contains_correlated_columns
+query TT
+explain SELECT t0_id, t0_name FROM t0 WHERE EXISTS (SELECT 1 FROM t1 INNER JOIN (select * from t2 where t2.t2_name = t0.t0_name) as t2 ON(t1.t1_id = t2.t2_id ))
+----
+logical_plan
+LeftSemi Join: t0.t0_name = __correlated_sq_19.t2_name
+--TableScan: t0 projection=[t0_id, t0_name]
+--SubqueryAlias: __correlated_sq_19
+----Projection: t2.t2_name
+------Inner Join: t1.t1_id = t2.t2_id
+--------TableScan: t1 projection=[t1_id]
+--------SubqueryAlias: t2
+----------TableScan: t2 projection=[t2_id, t2_name]
+
+#subquery_contains_join_contains_sub_query_alias_correlated_columns
+query TT
+explain SELECT t0_id, t0_name FROM t0 WHERE EXISTS (select 1 from (SELECT * FROM t1 where t1.t1_id = t0.t0_id) as x INNER JOIN (select * from t2 where t2.t2_name = t0.t0_name) as y ON(x.t1_id = y.t2_id))
+----
+logical_plan
+LeftSemi Join: t0.t0_id = __correlated_sq_20.t1_id, t0.t0_name = __correlated_sq_20.t2_name
+--TableScan: t0 projection=[t0_id, t0_name]
+--SubqueryAlias: __correlated_sq_20
+----Projection: x.t1_id, y.t2_name
+------Inner Join: x.t1_id = y.t2_id
+--------SubqueryAlias: x
+----------TableScan: t1 projection=[t1_id]
+--------SubqueryAlias: y
+----------TableScan: t2 projection=[t2_id, t2_name]
+
+#support_order_by_correlated_columns
+query TT
+explain SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT * FROM t2 WHERE t2_id >= t1_id order by t1_id)
+----
+logical_plan
+Filter: EXISTS (<subquery>)
+--Subquery:
+----Sort: outer_ref(t1.t1_id) ASC NULLS LAST
+------Projection: t2.t2_id, t2.t2_name, t2.t2_int
+--------Filter: t2.t2_id >= outer_ref(t1.t1_id)
+----------TableScan: t2
+--TableScan: t1 projection=[t1_id, t1_name]
+
+#exists_subquery_with_select_null
+query TT
+explain SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT NULL)
+----
+logical_plan
+Filter: EXISTS (<subquery>)
+--Subquery:
+----Projection: NULL
+------EmptyRelation
+--TableScan: t1 projection=[t1_id, t1_name]
+
+#exists_subquery_with_limit
+#de-correlated, limit is removed
+query TT
+explain SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT * FROM t2 WHERE t2_id = t1_id limit 1)
+----
+logical_plan
+LeftSemi Join: t1.t1_id = __correlated_sq_25.t2_id
+--TableScan: t1 projection=[t1_id, t1_name]
+--SubqueryAlias: __correlated_sq_25
+----TableScan: t2 projection=[t2_id]
+
+query IT rowsort
+SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT * FROM t2 WHERE t2_id = t1_id limit 1)
+----
+11 a
+22 b
+44 d
+
+#exists_subquery_with_limit0
+#de-correlated, limit is removed and replaced with EmptyRelation
+query TT
+explain SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT * FROM t2 WHERE t2_id = t1_id limit 0)
+----
+logical_plan
+LeftSemi Join: t1.t1_id = __correlated_sq_27.t2_id
+--TableScan: t1 projection=[t1_id, t1_name]
+--EmptyRelation
+
+query IT rowsort
+SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT * FROM t2 WHERE t2_id = t1_id limit 0)
+----
+
+
+#not_exists_subquery_with_limit0
+#de-correlated, limit is removed and replaced with EmptyRelation
+query TT
+explain SELECT t1_id, t1_name FROM t1 WHERE NOT EXISTS (SELECT * FROM t2 WHERE t2_id = t1_id limit 0)
+----
+logical_plan
+LeftAnti Join: t1.t1_id = __correlated_sq_29.t2_id
+--TableScan: t1 projection=[t1_id, t1_name]
+--EmptyRelation
+
+query IT rowsort
+SELECT t1_id, t1_name FROM t1 WHERE NOT EXISTS (SELECT * FROM t2 WHERE t2_id = t1_id limit 0)
+----
+11 a
+22 b
+33 c
+44 d
+
+#in_correlated_subquery_with_limit
+#not de-correlated
+query TT
+explain SELECT t1_id, t1_name FROM t1 WHERE t1_id in (SELECT t2_id FROM t2 where t1_name = t2_name limit 10)
+----
+logical_plan
+Filter: t1.t1_id IN (<subquery>)
+--Subquery:
+----Limit: skip=0, fetch=10
+------Projection: t2.t2_id
+--------Filter: outer_ref(t1.t1_name) = t2.t2_name
+----------TableScan: t2
+--TableScan: t1 projection=[t1_id, t1_name]
+
+#in_non_correlated_subquery_with_limit
+#de-correlated, limit is kept
+query TT
+explain SELECT t1_id, t1_name FROM t1 WHERE t1_id in (SELECT t2_id FROM t2 limit 10)
+----
+logical_plan
+LeftSemi Join: t1.t1_id = __correlated_sq_33.t2_id
+--TableScan: t1 projection=[t1_id, t1_name]
+--SubqueryAlias: __correlated_sq_33
+----Limit: skip=0, fetch=10
+------TableScan: t2 projection=[t2_id], fetch=10
+
+
+#uncorrelated_scalar_subquery_with_limit0
+query TT
+explain SELECT t1_id, (SELECT t2_id FROM t2 limit 0) FROM t1
+----
+logical_plan
+Projection: t1.t1_id, __scalar_sq_18.t2_id AS t2_id
+--Left Join:
+----TableScan: t1 projection=[t1_id]
+----EmptyRelation
+
+query II rowsort
+SELECT t1_id, (SELECT t2_id FROM t2 limit 0) FROM t1
+----
+11 NULL
+22 NULL
+33 NULL
+44 NULL
+
+#support_union_subquery
+query TT
+explain SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT * FROM t2 WHERE t2_id = t1_id UNION ALL SELECT * FROM t2 WHERE upper(t2_name) = upper(t1.t1_name))
+----
+logical_plan
+Filter: EXISTS (<subquery>)
+--Subquery:
+----Union
+------Projection: t2.t2_id, t2.t2_name, t2.t2_int
+--------Filter: t2.t2_id = outer_ref(t1.t1_id)
+----------TableScan: t2
+------Projection: t2.t2_id, t2.t2_name, t2.t2_int
+--------Filter: upper(t2.t2_name) = upper(outer_ref(t1.t1_name))
+----------TableScan: t2
+--TableScan: t1 projection=[t1_id, t1_name]
+
+#simple_uncorrelated_scalar_subquery
+query TT
+explain select (select count(*) from t1) as b
+----
+logical_plan
+Projection: __scalar_sq_20.COUNT(UInt8(1)) AS b
+--SubqueryAlias: __scalar_sq_20
+----Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]
+------TableScan: t1 projection=[t1_id]
+
+#simple_uncorrelated_scalar_subquery2
+query TT
+explain select (select count(*) from t1) as b, (select count(1) from t2)
+----
+logical_plan
+Projection: __scalar_sq_21.COUNT(UInt8(1)) AS b, __scalar_sq_22.COUNT(Int64(1)) AS COUNT(Int64(1))
+--Left Join:
+----SubqueryAlias: __scalar_sq_21
+------Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]
+--------TableScan: t1 projection=[t1_id]
+----SubqueryAlias: __scalar_sq_22
+------Aggregate: groupBy=[[]], aggr=[[COUNT(Int64(1))]]
+--------TableScan: t2 projection=[t2_id]
+
+query II
+select (select count(*) from t1) as b, (select count(1) from t2)
+----
+4 4
+
+#correlated_scalar_subquery_count_agg
+query TT
+explain SELECT t1_id, (SELECT count(*) FROM t2 WHERE t2.t2_int = t1.t1_int) from t1
+----
+logical_plan
+Projection: t1.t1_id, CASE WHEN __scalar_sq_25.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_25.COUNT(UInt8(1)) END AS COUNT(UInt8(1))
+--Left Join: t1.t1_int = __scalar_sq_25.t2_int
+----TableScan: t1 projection=[t1_id, t1_int]
+----SubqueryAlias: __scalar_sq_25
+------Projection: COUNT(UInt8(1)), t2.t2_int, __always_true
+--------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]]
+----------TableScan: t2 projection=[t2_int]
+
+query II rowsort
+SELECT t1_id, (SELECT count(*) FROM t2 WHERE t2.t2_int = t1.t1_int) from t1
+----
+11 1
+22 0
+33 3
+44 0
+
+
+#correlated_scalar_subquery_count_agg2
+query TT
+explain SELECT t1_id, (SELECT count(*) FROM t2 WHERE t2.t2_int = t1.t1_int) as cnt from t1
+----
+logical_plan
+Projection: t1.t1_id, CASE WHEN __scalar_sq_27.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_27.COUNT(UInt8(1)) END AS cnt
+--Left Join: t1.t1_int = __scalar_sq_27.t2_int
+----TableScan: t1 projection=[t1_id, t1_int]
+----SubqueryAlias: __scalar_sq_27
+------Projection: COUNT(UInt8(1)), t2.t2_int, __always_true
+--------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]]
+----------TableScan: t2 projection=[t2_int]
+
+query II rowsort
+SELECT t1_id, (SELECT count(*) FROM t2 WHERE t2.t2_int = t1.t1_int) as cnt from t1
+----
+11 1
+22 0
+33 3
+44 0
+
+#correlated_scalar_subquery_count_agg_with_alias
+query TT
+explain SELECT t1_id, (SELECT count(*) as _cnt FROM t2 WHERE t2.t2_int = t1.t1_int) as cnt from t1
+----
+logical_plan
+Projection: t1.t1_id, CASE WHEN __scalar_sq_29.__always_true IS NULL THEN Int64(0) AS _cnt ELSE __scalar_sq_29._cnt END AS cnt
+--Left Join: t1.t1_int = __scalar_sq_29.t2_int
+----TableScan: t1 projection=[t1_id, t1_int]
+----SubqueryAlias: __scalar_sq_29
+------Projection: COUNT(UInt8(1)) AS _cnt, t2.t2_int, __always_true
+--------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]]
+----------TableScan: t2 projection=[t2_int]
+
+query II rowsort
+SELECT t1_id, (SELECT count(*) as _cnt FROM t2 WHERE t2.t2_int = t1.t1_int) as cnt from t1
+----
+11 1
+22 0
+33 3
+44 0
+
+#correlated_scalar_subquery_count_agg_complex_expr
+query TT
+explain SELECT t1_id, (SELECT count(*) + 2 as _cnt FROM t2 WHERE t2.t2_int = t1.t1_int) from t1
+----
+logical_plan
+Projection: t1.t1_id, CASE WHEN __scalar_sq_31.__always_true IS NULL THEN Int64(2) AS _cnt ELSE __scalar_sq_31._cnt END AS _cnt
+--Left Join: t1.t1_int = __scalar_sq_31.t2_int
+----TableScan: t1 projection=[t1_id, t1_int]
+----SubqueryAlias: __scalar_sq_31
+------Projection: COUNT(UInt8(1)) + Int64(2) AS _cnt, t2.t2_int, __always_true
+--------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]]
+----------TableScan: t2 projection=[t2_int]
+
+query II rowsort
+SELECT t1_id, (SELECT count(*) + 2 as _cnt FROM t2 WHERE t2.t2_int = t1.t1_int) from t1
+----
+11 3
+22 2
+33 5
+44 2
+
+#correlated_scalar_subquery_count_agg_where_clause
+query TT
+explain select t1.t1_int from t1 where (select count(*) from t2 where t1.t1_id = t2.t2_id) < t1.t1_int
+----
+logical_plan
+Projection: t1.t1_int
+--Filter: CASE WHEN __scalar_sq_33.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_33.COUNT(UInt8(1)) END < CAST(t1.t1_int AS Int64)
+----Projection: t1.t1_int, __scalar_sq_33.COUNT(UInt8(1)), __scalar_sq_33.__always_true
+------Left Join: t1.t1_id = __scalar_sq_33.t2_id
+--------TableScan: t1 projection=[t1_id, t1_int]
+--------SubqueryAlias: __scalar_sq_33
+----------Projection: COUNT(UInt8(1)), t2.t2_id, __always_true
+------------Aggregate: groupBy=[[t2.t2_id, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]]
+--------------TableScan: t2 projection=[t2_id]
+
+query I rowsort
+select t1.t1_int from t1 where (select count(*) from t2 where t1.t1_id = t2.t2_id) < t1.t1_int
+----
+2
+3
+4
+
+#correlated_scalar_subquery_count_agg_with_having
+#the having condition is kept as the normal filter condition, no need to pull up
+query TT
+explain SELECT t1_id, (SELECT count(*) + 2 as cnt_plus_2 FROM t2 WHERE t2.t2_int = t1.t1_int having count(*) >1) from t1
+----
+logical_plan
+Projection: t1.t1_id, __scalar_sq_35.cnt_plus_2 AS cnt_plus_2
+--Left Join: t1.t1_int = __scalar_sq_35.t2_int
+----TableScan: t1 projection=[t1_id, t1_int]
+----SubqueryAlias: __scalar_sq_35
+------Projection: COUNT(UInt8(1)) + Int64(2) AS cnt_plus_2, t2.t2_int
+--------Filter: COUNT(UInt8(1)) > Int64(1)
+----------Projection: t2.t2_int, COUNT(UInt8(1))
+------------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]]
+--------------TableScan: t2 projection=[t2_int]
+
+query II rowsort
+SELECT t1_id, (SELECT count(*) + 2 as cnt_plus_2 FROM t2 WHERE t2.t2_int = t1.t1_int having count(*) >1) from t1
+----
+11 NULL
+22 NULL
+33 5
+44 NULL
+
+#correlated_scalar_subquery_count_agg_with_pull_up_having
+#the having condition need to pull up and evaluated after the left out join
+query TT
+explain SELECT t1_id, (SELECT count(*) + 2 as cnt_plus_2 FROM t2 WHERE t2.t2_int = t1.t1_int having count(*) = 0) from t1
+----
+logical_plan
+Projection: t1.t1_id, CASE WHEN __scalar_sq_37.__always_true IS NULL THEN Int64(2) AS cnt_plus_2 WHEN __scalar_sq_37.COUNT(UInt8(1)) != Int64(0) THEN NULL ELSE __scalar_sq_37.cnt_plus_2 END AS cnt_plus_2
+--Left Join: t1.t1_int = __scalar_sq_37.t2_int
+----TableScan: t1 projection=[t1_id, t1_int]
+----SubqueryAlias: __scalar_sq_37
+------Projection: COUNT(UInt8(1)) + Int64(2) AS cnt_plus_2, t2.t2_int, COUNT(UInt8(1)), __always_true
+--------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]]
+----------TableScan: t2 projection=[t2_int]
+
+query II rowsort
+SELECT t1_id, (SELECT count(*) + 2 as cnt_plus_2 FROM t2 WHERE t2.t2_int = t1.t1_int having count(*) = 0) from t1
+----
+11 NULL
+22 2
+33 NULL
+44 2
+
+#correlated_scalar_subquery_count_agg_in_having
+query TT
+explain select t1.t1_int from t1 group by t1.t1_int having (select count(*) from t2 where t1.t1_int = t2.t2_int) = 0
+----
+logical_plan
+Projection: t1.t1_int
+--Filter: CASE WHEN __scalar_sq_39.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_39.COUNT(UInt8(1)) END = Int64(0)
+----Projection: t1.t1_int, __scalar_sq_39.COUNT(UInt8(1)), __scalar_sq_39.__always_true
+------Left Join: t1.t1_int = __scalar_sq_39.t2_int
+--------Aggregate: groupBy=[[t1.t1_int]], aggr=[[]]
+----------TableScan: t1 projection=[t1_int]
+--------SubqueryAlias: __scalar_sq_39
+----------Projection: COUNT(UInt8(1)), t2.t2_int, __always_true
+------------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]]
+--------------TableScan: t2 projection=[t2_int]
+
+query I rowsort
+select t1.t1_int from t1 group by t1.t1_int having (select count(*) from t2 where t1.t1_int = t2.t2_int) = 0
+----
+2
+4
+
+#correlated_scalar_subquery_count_agg_in_nested_projection
+query TT
+explain select t1.t1_int from t1 where (select cnt from (select count(*) as cnt, sum(t2_int) from t2 where t1.t1_int = t2.t2_int)) = 0
+----
+logical_plan
+Projection: t1.t1_int
+--Filter: CASE WHEN __scalar_sq_41.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_41.cnt END = Int64(0)
+----Projection: t1.t1_int, __scalar_sq_41.cnt, __scalar_sq_41.__always_true
+------Left Join: t1.t1_int = __scalar_sq_41.t2_int
+--------TableScan: t1 projection=[t1_int]
+--------SubqueryAlias: __scalar_sq_41
+----------Projection: COUNT(UInt8(1)) AS cnt, t2.t2_int, __always_true
+------------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]]
+--------------TableScan: t2 projection=[t2_int]
+
+
+query I rowsort
+select t1.t1_int from t1 where (select cnt from (select count(*) as cnt, sum(t2_int) from t2 where t1.t1_int = t2.t2_int)) = 0
+----
+2
+4
+
+#correlated_scalar_subquery_count_agg_in_nested_subquery
+#pull up the deeply nested having condition
+query TT
+explain
+select t1.t1_int from t1 where (
+ select cnt_plus_one + 1 as cnt_plus_two from (
+ select cnt + 1 as cnt_plus_one from (
+ select count(*) as cnt, sum(t2_int) s from t2 where t1.t1_int = t2.t2_int having cnt = 0
+ )
+ )
+) = 2
+----
+logical_plan
+Projection: t1.t1_int
+--Filter: CASE WHEN __scalar_sq_43.__always_true IS NULL THEN Int64(2) WHEN __scalar_sq_43.COUNT(UInt8(1)) != Int64(0) THEN NULL ELSE __scalar_sq_43.cnt_plus_two END = Int64(2)
+----Projection: t1.t1_int, __scalar_sq_43.cnt_plus_two, __scalar_sq_43.COUNT(UInt8(1)), __scalar_sq_43.__always_true
+------Left Join: t1.t1_int = __scalar_sq_43.t2_int
+--------TableScan: t1 projection=[t1_int]
+--------SubqueryAlias: __scalar_sq_43
+----------Projection: COUNT(UInt8(1)) + Int64(1) + Int64(1) AS cnt_plus_two, t2.t2_int, COUNT(UInt8(1)), __always_true
+------------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]]
+--------------TableScan: t2 projection=[t2_int]
+
+query I rowsort
+select t1.t1_int from t1 where (
+ select cnt_plus_one + 1 as cnt_plus_two from (
+ select cnt + 1 as cnt_plus_one from (
+ select count(*) as cnt, sum(t2_int) s from t2 where t1.t1_int = t2.t2_int having cnt = 0
+ )
+ )
+) = 2
+----
+2
+4
+
+#correlated_scalar_subquery_count_agg_in_case_when
+query TT
+explain
+select t1.t1_int from t1 where
+ (select case when count(*) = 1 then null else count(*) end as cnt from t2 where t2.t2_int = t1.t1_int) = 0
+----
+logical_plan
+Projection: t1.t1_int
+--Filter: CASE WHEN __scalar_sq_45.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_45.cnt END = Int64(0)
+----Projection: t1.t1_int, __scalar_sq_45.cnt, __scalar_sq_45.__always_true
+------Left Join: t1.t1_int = __scalar_sq_45.t2_int
+--------TableScan: t1 projection=[t1_int]
+--------SubqueryAlias: __scalar_sq_45
+----------Projection: CASE WHEN COUNT(UInt8(1)) = Int64(1) THEN Int64(NULL) ELSE COUNT(UInt8(1)) END AS cnt, t2.t2_int, __always_true
+------------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]]
+--------------TableScan: t2 projection=[t2_int]
+
+
+query I rowsort
+select t1.t1_int from t1 where
+ (select case when count(*) = 1 then null else count(*) end as cnt from t2 where t2.t2_int = t1.t1_int) = 0
+----
+2
+4
+
+
+
+
+