You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/06/19 11:05:23 UTC

[arrow-datafusion] branch main updated: Port test in subqueries.rs from rust to sqllogictest (#6675)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 873d4178ab Port test in subqueries.rs from rust to sqllogictest  (#6675)
873d4178ab is described below

commit 873d4178ab3c98ca6f78ef553c8e33e12dd342fb
Author: zhenxing jiang <ji...@gmail.com>
AuthorDate: Mon Jun 19 19:05:17 2023 +0800

    Port test in subqueries.rs from rust to sqllogictest  (#6675)
    
    * port correlated_recursive_scalar_subquery to sql
    
    * port correlated_where_in to sql
    
    * port exists_subquery_with_same_table to sql
    
    * port in_subquery_with_same_table to sql
    
    * port in_subquery_nested_exist_subquery to sql
    
    * port invalid_scalar_subquery to sql
    
    * port subquery_not_allowed to sql
    
    * port non_aggregated_correlated_scalar_subquery to sql
    
    * port non_aggregated_correlated_scalar_subquery_with_limit to sql
    
    * port non_aggregated_correlated_scalar_subquery_with_single_row to sql
    
    * port non_equal_correlated_scalar_subquery to sql
    
    * port aggregated_correlated_scalar_subquery_with_extra_group_by_columns to sql
    
    * port support_agg_correlated_columns to sql
    
    * port support_agg_correlated_columns2 to sql
    
    * port support_join_correlated_columns to sql
    
    * port subquery_contains_join_contains_correlated_columns to sql
    
    * port subquery_contains_join_contains_sub_query_alias_correlated_columns to sql
    
    * port support_order_by_correlated_columns to sql
    
    * port exists_subquery_with_select_null to sql
    
    * port exists_subquery_with_limit to sql
    
    * port exists_subquery_with_limit0 to sql
    
    * port not_exists_subquery_with_limit0 to sql
    
    * port in_correlated_subquery_with_limit to sql
    
    * port in_non_correlated_subquery_with_limit to sql
    
    * port uncorrelated_scalar_subquery_with_limit0 to sql
    
    * port support_union_subquery to sql
    
    * port simple_uncorrelated_scalar_subquery to sql
    
    * port simple_uncorrelated_scalar_subquery2 to sql
    
    * port correlated_scalar_subquery_count_agg to sql
    
    * port correlated_scalar_subquery_count_agg2 to sql
    
    * port correlated_scalar_subquery_count_agg_with_alias to sql
    
    * port correlated_scalar_subquery_count_agg_complex_expr to sql
    
    * port correlated_scalar_subquery_count_agg_where_clause to sql
    
    * port correlated_scalar_subquery_count_agg_with_having to sql
    
    * port correlated_scalar_subquery_count_agg_with_pull_up_having to sql
    
    * port correlated_scalar_subquery_count_agg_in_having to sql
    
    * port correlated_scalar_subquery_count_agg_in_nested_projection to sql
    
    * port correlated_scalar_subquery_count_agg_in_nested_subquery to sql
    
    * port correlated_scalar_subquery_count_agg_in_case_when to sql
    
    * clippy fix
---
 datafusion/core/tests/sql/mod.rs                   |   76 --
 datafusion/core/tests/sql/subqueries.rs            | 1372 --------------------
 .../tests/sqllogictests/test_files/subquery.slt    |  724 ++++++++++-
 3 files changed, 723 insertions(+), 1449 deletions(-)

diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index 5fa4b2383b..ca0cfc3dbe 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -201,82 +201,6 @@ fn create_join_context(
     Ok(ctx)
 }
 
-fn create_sub_query_join_context(
-    column_outer: &str,
-    column_inner_left: &str,
-    column_inner_right: &str,
-    repartition_joins: bool,
-) -> Result<SessionContext> {
-    let ctx = SessionContext::with_config(
-        SessionConfig::new()
-            .with_repartition_joins(repartition_joins)
-            .with_target_partitions(2)
-            .with_batch_size(4096),
-    );
-
-    let t0_schema = Arc::new(Schema::new(vec![
-        Field::new(column_outer, DataType::UInt32, true),
-        Field::new("t0_name", DataType::Utf8, true),
-        Field::new("t0_int", DataType::UInt32, true),
-    ]));
-    let t0_data = RecordBatch::try_new(
-        t0_schema,
-        vec![
-            Arc::new(UInt32Array::from(vec![11, 22, 33, 44])),
-            Arc::new(StringArray::from(vec![
-                Some("a"),
-                Some("b"),
-                Some("c"),
-                Some("d"),
-            ])),
-            Arc::new(UInt32Array::from(vec![1, 2, 3, 4])),
-        ],
-    )?;
-    ctx.register_batch("t0", t0_data)?;
-
-    let t1_schema = Arc::new(Schema::new(vec![
-        Field::new(column_inner_left, DataType::UInt32, true),
-        Field::new("t1_name", DataType::Utf8, true),
-        Field::new("t1_int", DataType::UInt32, true),
-    ]));
-    let t1_data = RecordBatch::try_new(
-        t1_schema,
-        vec![
-            Arc::new(UInt32Array::from(vec![11, 22, 33, 44])),
-            Arc::new(StringArray::from(vec![
-                Some("a"),
-                Some("b"),
-                Some("c"),
-                Some("d"),
-            ])),
-            Arc::new(UInt32Array::from(vec![1, 2, 3, 4])),
-        ],
-    )?;
-    ctx.register_batch("t1", t1_data)?;
-
-    let t2_schema = Arc::new(Schema::new(vec![
-        Field::new(column_inner_right, DataType::UInt32, true),
-        Field::new("t2_name", DataType::Utf8, true),
-        Field::new("t2_int", DataType::UInt32, true),
-    ]));
-    let t2_data = RecordBatch::try_new(
-        t2_schema,
-        vec![
-            Arc::new(UInt32Array::from(vec![11, 22, 44, 55])),
-            Arc::new(StringArray::from(vec![
-                Some("z"),
-                Some("y"),
-                Some("x"),
-                Some("w"),
-            ])),
-            Arc::new(UInt32Array::from(vec![3, 1, 3, 3])),
-        ],
-    )?;
-    ctx.register_batch("t2", t2_data)?;
-
-    Ok(ctx)
-}
-
 fn create_left_semi_anti_join_context_with_null_ids(
     column_left: &str,
     column_right: &str,
diff --git a/datafusion/core/tests/sql/subqueries.rs b/datafusion/core/tests/sql/subqueries.rs
index 1fd627bc77..7d38b9173c 100644
--- a/datafusion/core/tests/sql/subqueries.rs
+++ b/datafusion/core/tests/sql/subqueries.rs
@@ -17,1109 +17,6 @@
 
 use super::*;
 use crate::sql::execute_to_batches;
-use datafusion::assert_batches_eq;
-use datafusion::prelude::SessionContext;
-use log::debug;
-
-#[tokio::test]
-async fn correlated_recursive_scalar_subquery() -> Result<()> {
-    let ctx = SessionContext::new();
-    register_tpch_csv(&ctx, "customer").await?;
-    register_tpch_csv(&ctx, "orders").await?;
-    register_tpch_csv(&ctx, "lineitem").await?;
-
-    let sql = r#"
-select c_custkey from customer
-where c_acctbal < (
-    select sum(o_totalprice) from orders
-    where o_custkey = c_custkey
-    and o_totalprice < (
-            select sum(l_extendedprice) as price from lineitem where l_orderkey = o_orderkey
-    )
-) order by c_custkey;"#;
-
-    // assert plan
-    let dataframe = ctx.sql(sql).await.unwrap();
-    debug!("input:\n{}", dataframe.logical_plan().display_indent());
-
-    let plan = dataframe.into_optimized_plan().unwrap();
-    let actual = format!("{}", plan.display_indent());
-    let expected =  "Sort: customer.c_custkey ASC NULLS LAST\
-    \n  Projection: customer.c_custkey\
-    \n    Inner Join: customer.c_custkey = __scalar_sq_1.o_custkey Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_1.SUM(orders.o_totalprice)\
-    \n      TableScan: customer projection=[c_custkey, c_acctbal]\
-    \n      SubqueryAlias: __scalar_sq_1\
-    \n        Projection: SUM(orders.o_totalprice), orders.o_custkey\
-    \n          Aggregate: groupBy=[[orders.o_custkey]], aggr=[[SUM(orders.o_totalprice)]]\
-    \n            Projection: orders.o_custkey, orders.o_totalprice\
-    \n              Inner Join: orders.o_orderkey = __scalar_sq_2.l_orderkey Filter: CAST(orders.o_totalprice AS Decimal128(25, 2)) < __scalar_sq_2.price\
-    \n                TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice]\
-    \n                SubqueryAlias: __scalar_sq_2\
-    \n                  Projection: SUM(lineitem.l_extendedprice) AS price, lineitem.l_orderkey\
-    \n                    Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_extendedprice)]]\
-    \n                      TableScan: lineitem projection=[l_orderkey, l_extendedprice]";
-    assert_eq!(actual, expected);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn correlated_where_in() -> Result<()> {
-    let orders = r#"1,3691,O,194029.55,1996-01-02,5-LOW,Clerk#000000951,0,
-65,1627,P,99763.79,1995-03-18,1-URGENT,Clerk#000000632,0,
-"#;
-    let lineitems = r#"1,15519,785,1,17,24386.67,0.04,0.02,N,O,1996-03-13,1996-02-12,1996-03-22,DELIVER IN PERSON,TRUCK,
-1,6731,732,2,36,58958.28,0.09,0.06,N,O,1996-04-12,1996-02-28,1996-04-20,TAKE BACK RETURN,MAIL,
-65,5970,481,1,26,48775.22,0.03,0.03,A,F,1995-04-20,1995-04-25,1995-05-13,NONE,TRUCK,
-65,7382,897,2,22,28366.36,0,0.05,N,O,1995-07-17,1995-06-04,1995-07-19,COLLECT COD,FOB,
-"#;
-
-    let ctx = SessionContext::new();
-    register_tpch_csv_data(&ctx, "orders", orders).await?;
-    register_tpch_csv_data(&ctx, "lineitem", lineitems).await?;
-
-    let sql = r#"select o_orderkey from orders
-where o_orderstatus in (
-    select l_linestatus from lineitem where l_orderkey = orders.o_orderkey
-);"#;
-
-    // assert plan
-    let dataframe = ctx.sql(sql).await.unwrap();
-    let plan = dataframe.into_optimized_plan().unwrap();
-    let actual = format!("{}", plan.display_indent());
-
-    let expected = "Projection: orders.o_orderkey\
-    \n  LeftSemi Join: orders.o_orderstatus = __correlated_sq_1.l_linestatus, orders.o_orderkey = __correlated_sq_1.l_orderkey\
-    \n    TableScan: orders projection=[o_orderkey, o_orderstatus]\
-    \n    SubqueryAlias: __correlated_sq_1\
-    \n      Projection: lineitem.l_linestatus, lineitem.l_orderkey\
-    \n        TableScan: lineitem projection=[l_orderkey, l_linestatus]";
-    assert_eq!(actual, expected);
-
-    // assert data
-    let results = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+------------+",
-        "| o_orderkey |",
-        "+------------+",
-        "| 1          |",
-        "+------------+",
-    ];
-    assert_batches_eq!(expected, &results);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn exists_subquery_with_same_table() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    // Subquery and outer query refer to the same table.
-    // It will not be rewritten to join because it is not a correlated subquery.
-    let sql = "SELECT t1_id, t1_name, t1_int FROM t1 WHERE EXISTS(SELECT t1_int FROM t1 WHERE t1.t1_id > t1.t1_int)";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    let expected = vec![
-        "Explain [plan_type:Utf8, plan:Utf8]",
-        "  Filter: EXISTS (<subquery>) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-        "    Subquery: [t1_int:UInt32;N]",
-        "      Projection: t1.t1_int [t1_int:UInt32;N]",
-        "        Filter: t1.t1_id > t1.t1_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-        "          TableScan: t1 [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-        "    TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn in_subquery_with_same_table() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    // Subquery and outer query refer to the same table.
-    // It will be rewritten to join because in-subquery has extra predicate(`t1.t1_id = __correlated_sq_1.t1_int`).
-    let sql = "SELECT t1_id, t1_name, t1_int FROM t1 WHERE t1_id IN(SELECT t1_int FROM t1 WHERE t1.t1_id > t1.t1_int)";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    let expected = vec![
-        "Explain [plan_type:Utf8, plan:Utf8]",
-        "  LeftSemi Join: t1.t1_id = __correlated_sq_1.t1_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-        "    TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-        "    SubqueryAlias: __correlated_sq_1 [t1_int:UInt32;N]",
-        "      Projection: t1.t1_int [t1_int:UInt32;N]",
-        "        Filter: t1.t1_id > t1.t1_int [t1_id:UInt32;N, t1_int:UInt32;N]",
-        "          TableScan: t1 projection=[t1_id, t1_int] [t1_id:UInt32;N, t1_int:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn in_subquery_nested_exist_subquery() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "SELECT t1_id, t1_name, t1_int FROM t1 WHERE t1_id IN(SELECT t2_id FROM t2 WHERE EXISTS(select * from t1 WHERE t1.t1_int > t2.t2_int))";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    let expected = vec![
-        "Explain [plan_type:Utf8, plan:Utf8]",
-        "  LeftSemi Join: t1.t1_id = __correlated_sq_1.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-        "    TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-        "    SubqueryAlias: __correlated_sq_1 [t2_id:UInt32;N]",
-        "      Projection: t2.t2_id [t2_id:UInt32;N]",
-        "        LeftSemi Join:  Filter: __correlated_sq_2.t1_int > t2.t2_int [t2_id:UInt32;N, t2_int:UInt32;N]",
-        "          TableScan: t2 projection=[t2_id, t2_int] [t2_id:UInt32;N, t2_int:UInt32;N]",
-        "          SubqueryAlias: __correlated_sq_2 [t1_int:UInt32;N]",
-        "            TableScan: t1 projection=[t1_int] [t1_int:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn invalid_scalar_subquery() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "SELECT t1_id, t1_name, t1_int, (select t2_id, t2_name FROM t2 WHERE t2.t2_id = t1.t1_int) FROM t1";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let err = dataframe.into_optimized_plan().err().unwrap();
-    assert_eq!(
-        r#"Context("check_analyzed_plan", Plan("Scalar subquery should only return one column, but found 2: t2.t2_id, t2.t2_name"))"#,
-        &format!("{err:?}")
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn subquery_not_allowed() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    // In/Exist Subquery is not allowed in ORDER BY clause.
-    let sql = "SELECT t1_id, t1_name, t1_int FROM t1 order by t1_int in (SELECT t2_int FROM t2 WHERE t1.t1_id > t1.t1_int)";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let err = dataframe.into_optimized_plan().err().unwrap();
-
-    assert_eq!(
-        r#"Context("check_analyzed_plan", Plan("In/Exist subquery can only be used in Projection, Filter, Window functions, Aggregate and Join plan nodes"))"#,
-        &format!("{err:?}")
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn non_aggregated_correlated_scalar_subquery() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1.t1_int) as t2_int from t1";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let err = dataframe.into_optimized_plan().err().unwrap();
-
-    assert_eq!(
-        r#"Context("check_analyzed_plan", Plan("Correlated scalar subquery must be aggregated to return at most one row"))"#,
-        &format!("{err:?}")
-    );
-
-    let sql = "SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1_int group by t2_int) as t2_int from t1";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let err = dataframe.into_optimized_plan().err().unwrap();
-
-    assert_eq!(
-        r#"Context("check_analyzed_plan", Plan("Correlated scalar subquery must be aggregated to return at most one row"))"#,
-        &format!("{err:?}")
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn non_aggregated_correlated_scalar_subquery_with_limit() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1.t1_int limit 2) as t2_int from t1";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let err = dataframe.into_optimized_plan().err().unwrap();
-
-    assert_eq!(
-        r#"Context("check_analyzed_plan", Plan("Correlated scalar subquery must be aggregated to return at most one row"))"#,
-        &format!("{err:?}")
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn non_aggregated_correlated_scalar_subquery_with_single_row() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1.t1_int limit 1) as t2_int from t1";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    let expected = vec![
-        "Projection: t1.t1_id, (<subquery>) AS t2_int [t1_id:UInt32;N, t2_int:UInt32;N]",
-        "  Subquery: [t2_int:UInt32;N]",
-        "    Limit: skip=0, fetch=1 [t2_int:UInt32;N]",
-        "      Projection: t2.t2_int [t2_int:UInt32;N]",
-        "        Filter: t2.t2_int = outer_ref(t1.t1_int) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "          TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "  TableScan: t1 projection=[t1_id] [t1_id:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let sql = "SELECT t1_id from t1 where t1_int = (SELECT t2_int FROM t2 WHERE t2.t2_int = t1.t1_int limit 1)";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    let expected = vec![
-        "Projection: t1.t1_id [t1_id:UInt32;N]",
-        "  Filter: t1.t1_int = (<subquery>) [t1_id:UInt32;N, t1_int:UInt32;N]",
-        "    Subquery: [t2_int:UInt32;N]",
-        "      Limit: skip=0, fetch=1 [t2_int:UInt32;N]",
-        "        Projection: t2.t2_int [t2_int:UInt32;N]",
-        "          Filter: t2.t2_int = outer_ref(t1.t1_int) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "            TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "    TableScan: t1 projection=[t1_id, t1_int] [t1_id:UInt32;N, t1_int:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let sql = "SELECT t1_id, (SELECT a FROM (select 1 as a) WHERE a = t1.t1_int) as t2_int from t1";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    let expected = vec![
-        "Projection: t1.t1_id, __scalar_sq_5.a AS t2_int [t1_id:UInt32;N, t2_int:Int64;N]",
-        "  Left Join: CAST(t1.t1_int AS Int64) = __scalar_sq_5.a [t1_id:UInt32;N, t1_int:UInt32;N, a:Int64;N]",
-        "    TableScan: t1 projection=[t1_id, t1_int] [t1_id:UInt32;N, t1_int:UInt32;N]",
-        "    SubqueryAlias: __scalar_sq_5 [a:Int64]",
-        "      Projection: Int64(1) AS a [a:Int64]",
-        "        EmptyRelation []",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    // TODO infer nullability in the schema has bug
-    // // assert data
-    // let results = execute_to_batches(&ctx, sql).await;
-    // let expected = vec![
-    //     "+-------+--------+",
-    //     "| t1_id | t2_int |",
-    //     "+-------+--------+",
-    //     "| 22    |        |",
-    //     "| 33    |        |",
-    //     "| 11    | 1      |",
-    //     "| 44    |        |",
-    //     "+-------+--------+",
-    // ];
-    // assert_batches_eq!(expected, &results);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn non_equal_correlated_scalar_subquery() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id < t1.t1_id) as t2_sum from t1";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let err = dataframe.into_optimized_plan().err().unwrap();
-
-    assert_eq!(
-        r#"Context("check_analyzed_plan", Plan("Correlated column is not allowed in predicate: t2.t2_id < outer_ref(t1.t1_id)"))"#,
-        &format!("{err:?}")
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn aggregated_correlated_scalar_subquery_with_extra_group_by_columns() -> Result<()>
-{
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id group by t2_name) as t2_sum from t1";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let err = dataframe.into_optimized_plan().err().unwrap();
-
-    assert_eq!(
-        r#"Context("check_analyzed_plan", Plan("A GROUP BY clause in a scalar correlated subquery cannot contain non-correlated columns"))"#,
-        &format!("{err:?}")
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn support_agg_correlated_columns() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT sum(t1.t1_int + t2.t2_id) FROM t2 WHERE t1.t1_name = t2.t2_name)";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    let expected = vec![
-        "Filter: EXISTS (<subquery>) [t1_id:UInt32;N, t1_name:Utf8;N]",
-        "  Subquery: [SUM(outer_ref(t1.t1_int) + t2.t2_id):UInt64;N]",
-        "    Projection: SUM(outer_ref(t1.t1_int) + t2.t2_id) [SUM(outer_ref(t1.t1_int) + t2.t2_id):UInt64;N]",
-        "      Aggregate: groupBy=[[]], aggr=[[SUM(outer_ref(t1.t1_int) + t2.t2_id)]] [SUM(outer_ref(t1.t1_int) + t2.t2_id):UInt64;N]",
-        "        Filter: outer_ref(t1.t1_name) = t2.t2_name [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "          TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "  TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn support_agg_correlated_columns2() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT count(*) FROM t2 WHERE t1.t1_name = t2.t2_name having sum(t1_int + t2_id) >0)";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    let expected = vec![
-        "Filter: EXISTS (<subquery>) [t1_id:UInt32;N, t1_name:Utf8;N]",
-        "  Subquery: [COUNT(UInt8(1)):Int64;N]",
-        "    Projection: COUNT(UInt8(1)) [COUNT(UInt8(1)):Int64;N]",
-        "      Filter: CAST(SUM(outer_ref(t1.t1_int) + t2.t2_id) AS Int64) > Int64(0) [COUNT(UInt8(1)):Int64;N, SUM(outer_ref(t1.t1_int) + t2.t2_id):UInt64;N]",
-        "        Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)), SUM(outer_ref(t1.t1_int) + t2.t2_id)]] [COUNT(UInt8(1)):Int64;N, SUM(outer_ref(t1.t1_int) + t2.t2_id):UInt64;N]",
-        "          Filter: outer_ref(t1.t1_name) = t2.t2_name [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "            TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "  TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn support_join_correlated_columns() -> Result<()> {
-    let ctx = create_sub_query_join_context("t0_id", "t1_id", "t2_id", true)?;
-    let sql = "SELECT t0_id, t0_name FROM t0 WHERE EXISTS (SELECT 1 FROM t1 INNER JOIN t2 ON(t1.t1_id = t2.t2_id and t1.t1_name = t0.t0_name))";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    let expected = vec![
-        "Filter: EXISTS (<subquery>) [t0_id:UInt32;N, t0_name:Utf8;N]",
-        "  Subquery: [Int64(1):Int64]",
-        "    Projection: Int64(1) [Int64(1):Int64]",
-        "      Inner Join:  Filter: t1.t1_id = t2.t2_id AND t1.t1_name = outer_ref(t0.t0_name) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "        TableScan: t1 [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-        "        TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "  TableScan: t0 projection=[t0_id, t0_name] [t0_id:UInt32;N, t0_name:Utf8;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn subquery_contains_join_contains_correlated_columns() -> Result<()> {
-    let ctx = create_sub_query_join_context("t0_id", "t1_id", "t2_id", true)?;
-    let sql = "SELECT t0_id, t0_name FROM t0 WHERE EXISTS (SELECT 1 FROM t1 INNER JOIN (select * from t2 where t2.t2_name = t0.t0_name) as t2 ON(t1.t1_id = t2.t2_id ))";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    let expected = vec![
-        "LeftSemi Join: t0.t0_name = __correlated_sq_1.t2_name [t0_id:UInt32;N, t0_name:Utf8;N]",
-        "  TableScan: t0 projection=[t0_id, t0_name] [t0_id:UInt32;N, t0_name:Utf8;N]",
-        "  SubqueryAlias: __correlated_sq_1 [t2_name:Utf8;N]",
-        "    Projection: t2.t2_name [t2_name:Utf8;N]",
-        "      Inner Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N]",
-        "        TableScan: t1 projection=[t1_id] [t1_id:UInt32;N]",
-        "        SubqueryAlias: t2 [t2_id:UInt32;N, t2_name:Utf8;N]",
-        "          TableScan: t2 projection=[t2_id, t2_name] [t2_id:UInt32;N, t2_name:Utf8;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn subquery_contains_join_contains_sub_query_alias_correlated_columns() -> Result<()>
-{
-    let ctx = create_sub_query_join_context("t0_id", "t1_id", "t2_id", true)?;
-    let sql = "SELECT t0_id, t0_name FROM t0 WHERE EXISTS (select 1 from (SELECT * FROM t1 where t1.t1_id = t0.t0_id) as x INNER JOIN (select * from t2 where t2.t2_name = t0.t0_name) as y ON(x.t1_id = y.t2_id))";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    let expected = vec![
-        "LeftSemi Join: t0.t0_id = __correlated_sq_1.t1_id, t0.t0_name = __correlated_sq_1.t2_name [t0_id:UInt32;N, t0_name:Utf8;N]",
-        "  TableScan: t0 projection=[t0_id, t0_name] [t0_id:UInt32;N, t0_name:Utf8;N]",
-        "  SubqueryAlias: __correlated_sq_1 [t1_id:UInt32;N, t2_name:Utf8;N]",
-        "    Projection: x.t1_id, y.t2_name [t1_id:UInt32;N, t2_name:Utf8;N]",
-        "      Inner Join: x.t1_id = y.t2_id [t1_id:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N]",
-        "        SubqueryAlias: x [t1_id:UInt32;N]",
-        "          TableScan: t1 projection=[t1_id] [t1_id:UInt32;N]",
-        "        SubqueryAlias: y [t2_id:UInt32;N, t2_name:Utf8;N]",
-        "          TableScan: t2 projection=[t2_id, t2_name] [t2_id:UInt32;N, t2_name:Utf8;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn support_order_by_correlated_columns() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT * FROM t2 WHERE t2_id >= t1_id order by t1_id)";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    let expected = vec![
-        "Filter: EXISTS (<subquery>) [t1_id:UInt32;N, t1_name:Utf8;N]",
-        "  Subquery: [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "    Sort: outer_ref(t1.t1_id) ASC NULLS LAST [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "      Projection: t2.t2_id, t2.t2_name, t2.t2_int [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "        Filter: t2.t2_id >= outer_ref(t1.t1_id) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "          TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "  TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn exists_subquery_with_select_null() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT NULL)";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    let expected = vec![
-        "Filter: EXISTS (<subquery>) [t1_id:UInt32;N, t1_name:Utf8;N]",
-        "  Subquery: [NULL:Null;N]",
-        "    Projection: NULL [NULL:Null;N]",
-        "      EmptyRelation []",
-        "  TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn exists_subquery_with_limit() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT * FROM t2 WHERE t2_id = t1_id limit 1)";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    // de-correlated, limit is removed
-    let expected = vec![
-        "LeftSemi Join: t1.t1_id = __correlated_sq_1.t2_id [t1_id:UInt32;N, t1_name:Utf8;N]",
-        "  TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
-        "  SubqueryAlias: __correlated_sq_1 [t2_id:UInt32;N]",
-        "    TableScan: t2 projection=[t2_id] [t2_id:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    // assert data
-    let results = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-------+---------+",
-        "| t1_id | t1_name |",
-        "+-------+---------+",
-        "| 11    | a       |",
-        "| 22    | b       |",
-        "| 44    | d       |",
-        "+-------+---------+",
-    ];
-    assert_batches_sorted_eq!(expected, &results);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn exists_subquery_with_limit0() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT * FROM t2 WHERE t2_id = t1_id limit 0)";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    // de-correlated, limit is removed and replaced with EmptyRelation
-    let expected = vec![
-        "LeftSemi Join: t1.t1_id = __correlated_sq_1.t2_id [t1_id:UInt32;N, t1_name:Utf8;N]",
-        "  TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
-        "  EmptyRelation [t2_id:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    // assert data
-    let results = execute_to_batches(&ctx, sql).await;
-    let expected = vec!["++", "++"];
-    assert_batches_eq!(expected, &results);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn not_exists_subquery_with_limit0() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "SELECT t1_id, t1_name FROM t1 WHERE NOT EXISTS (SELECT * FROM t2 WHERE t2_id = t1_id limit 0)";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    // de-correlated, limit is removed and replaced with EmptyRelation
-    let expected = vec![
-        "LeftAnti Join: t1.t1_id = __correlated_sq_1.t2_id [t1_id:UInt32;N, t1_name:Utf8;N]",
-        "  TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
-        "  EmptyRelation [t2_id:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    // assert data
-    let results = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-------+---------+",
-        "| t1_id | t1_name |",
-        "+-------+---------+",
-        "| 11    | a       |",
-        "| 22    | b       |",
-        "| 33    | c       |",
-        "| 44    | d       |",
-        "+-------+---------+",
-    ];
-    assert_batches_sorted_eq!(expected, &results);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn in_correlated_subquery_with_limit() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "SELECT t1_id, t1_name FROM t1 WHERE t1_id in (SELECT t2_id FROM t2 where t1_name = t2_name limit 10)";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    // not de-correlated
-    let expected = vec![
-        "Filter: t1.t1_id IN (<subquery>) [t1_id:UInt32;N, t1_name:Utf8;N]",
-        "  Subquery: [t2_id:UInt32;N]",
-        "    Limit: skip=0, fetch=10 [t2_id:UInt32;N]",
-        "      Projection: t2.t2_id [t2_id:UInt32;N]",
-        "        Filter: outer_ref(t1.t1_name) = t2.t2_name [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "          TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "  TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn in_non_correlated_subquery_with_limit() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql =
-        "SELECT t1_id, t1_name FROM t1 WHERE t1_id in (SELECT t2_id FROM t2 limit 10)";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    // de-correlated, limit is kept
-    let expected = vec![
-        "LeftSemi Join: t1.t1_id = __correlated_sq_1.t2_id [t1_id:UInt32;N, t1_name:Utf8;N]",
-        "  TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
-        "  SubqueryAlias: __correlated_sq_1 [t2_id:UInt32;N]",
-        "    Limit: skip=0, fetch=10 [t2_id:UInt32;N]",
-        "      TableScan: t2 projection=[t2_id], fetch=10 [t2_id:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn uncorrelated_scalar_subquery_with_limit0() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "SELECT t1_id, (SELECT t2_id FROM t2 limit 0) FROM t1";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    let expected = vec![
-        "Projection: t1.t1_id, __scalar_sq_1.t2_id AS t2_id [t1_id:UInt32;N, t2_id:UInt32;N]",
-        "  Left Join:  [t1_id:UInt32;N, t2_id:UInt32;N]",
-        "    TableScan: t1 projection=[t1_id] [t1_id:UInt32;N]",
-        "    EmptyRelation [t2_id:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    // assert data
-    let results = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-------+-------+",
-        "| t1_id | t2_id |",
-        "+-------+-------+",
-        "| 11    |       |",
-        "| 22    |       |",
-        "| 33    |       |",
-        "| 44    |       |",
-        "+-------+-------+",
-    ];
-    assert_batches_sorted_eq!(expected, &results);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn support_union_subquery() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "SELECT t1_id, t1_name FROM t1 WHERE EXISTS \
-                (SELECT * FROM t2 WHERE t2_id = t1_id UNION ALL \
-                SELECT * FROM t2 WHERE upper(t2_name) = upper(t1.t1_name))";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    let expected = vec![
-        "Filter: EXISTS (<subquery>) [t1_id:UInt32;N, t1_name:Utf8;N]",
-        "  Subquery: [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "    Union [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "      Projection: t2.t2_id, t2.t2_name, t2.t2_int [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "        Filter: t2.t2_id = outer_ref(t1.t1_id) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "          TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "      Projection: t2.t2_id, t2.t2_name, t2.t2_int [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "        Filter: upper(t2.t2_name) = upper(outer_ref(t1.t1_name)) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "          TableScan: t2 [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        "  TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn simple_uncorrelated_scalar_subquery() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "select (select count(*) from t1) as b";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    let expected = vec![
-        "Projection: __scalar_sq_1.COUNT(UInt8(1)) AS b [b:Int64;N]",
-        "  SubqueryAlias: __scalar_sq_1 [COUNT(UInt8(1)):Int64;N]",
-        "    Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]] [COUNT(UInt8(1)):Int64;N]",
-        "      TableScan: t1 projection=[t1_id] [t1_id:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    // assert data
-    let results = execute_to_batches(&ctx, sql).await;
-    let expected = vec!["+---+", "| b |", "+---+", "| 4 |", "+---+"];
-    assert_batches_eq!(expected, &results);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn simple_uncorrelated_scalar_subquery2() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "select (select count(*) from t1) as b, (select count(1) from t2)";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    let expected = vec![
-        "Projection: __scalar_sq_1.COUNT(UInt8(1)) AS b, __scalar_sq_2.COUNT(Int64(1)) AS COUNT(Int64(1)) [b:Int64;N, COUNT(Int64(1)):Int64;N]",
-        "  Left Join:  [COUNT(UInt8(1)):Int64;N, COUNT(Int64(1)):Int64;N]",
-        "    SubqueryAlias: __scalar_sq_1 [COUNT(UInt8(1)):Int64;N]",
-        "      Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]] [COUNT(UInt8(1)):Int64;N]",
-        "        TableScan: t1 projection=[t1_id] [t1_id:UInt32;N]",
-        "    SubqueryAlias: __scalar_sq_2 [COUNT(Int64(1)):Int64;N]",
-        "      Aggregate: groupBy=[[]], aggr=[[COUNT(Int64(1))]] [COUNT(Int64(1)):Int64;N]",
-        "        TableScan: t2 projection=[t2_id] [t2_id:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    // assert data
-    let results = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+---+-----------------+",
-        "| b | COUNT(Int64(1)) |",
-        "+---+-----------------+",
-        "| 4 | 4               |",
-        "+---+-----------------+",
-    ];
-    assert_batches_sorted_eq!(expected, &results);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn correlated_scalar_subquery_count_agg() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql =
-        "SELECT t1_id, (SELECT count(*) FROM t2 WHERE t2.t2_int = t1.t1_int) from t1";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    let expected = vec![
-        "Projection: t1.t1_id, CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_1.COUNT(UInt8(1)) END AS COUNT(UInt8(1)) [t1_id:UInt32;N, COUNT(UInt8(1)):Int64;N]",
-        "  Left Join: t1.t1_int = __scalar_sq_1.t2_int [t1_id:UInt32;N, t1_int:UInt32;N, COUNT(UInt8(1)):Int64;N, t2_int:UInt32;N, __always_true:Boolean;N]",
-        "    TableScan: t1 projection=[t1_id, t1_int] [t1_id:UInt32;N, t1_int:UInt32;N]",
-        "    SubqueryAlias: __scalar_sq_1 [COUNT(UInt8(1)):Int64;N, t2_int:UInt32;N, __always_true:Boolean]",
-        "      Projection: COUNT(UInt8(1)), t2.t2_int, __always_true [COUNT(UInt8(1)):Int64;N, t2_int:UInt32;N, __always_true:Boolean]",
-        "        Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]] [t2_int:UInt32;N, __always_true:Boolean, COUNT(UInt8(1)):Int64;N]",
-        "          TableScan: t2 projection=[t2_int] [t2_int:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    // assert data
-    let results = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-------+-----------------+",
-        "| t1_id | COUNT(UInt8(1)) |",
-        "+-------+-----------------+",
-        "| 33    | 3               |",
-        "| 22    | 0               |",
-        "| 11    | 1               |",
-        "| 44    | 0               |",
-        "+-------+-----------------+",
-    ];
-    assert_batches_sorted_eq!(expected, &results);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn correlated_scalar_subquery_count_agg2() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "SELECT t1_id, (SELECT count(*) FROM t2 WHERE t2.t2_int = t1.t1_int) as cnt from t1";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    let expected = vec![
-        "Projection: t1.t1_id, CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_1.COUNT(UInt8(1)) END AS cnt [t1_id:UInt32;N, cnt:Int64;N]",
-        "  Left Join: t1.t1_int = __scalar_sq_1.t2_int [t1_id:UInt32;N, t1_int:UInt32;N, COUNT(UInt8(1)):Int64;N, t2_int:UInt32;N, __always_true:Boolean;N]",
-        "    TableScan: t1 projection=[t1_id, t1_int] [t1_id:UInt32;N, t1_int:UInt32;N]",
-        "    SubqueryAlias: __scalar_sq_1 [COUNT(UInt8(1)):Int64;N, t2_int:UInt32;N, __always_true:Boolean]",
-        "      Projection: COUNT(UInt8(1)), t2.t2_int, __always_true [COUNT(UInt8(1)):Int64;N, t2_int:UInt32;N, __always_true:Boolean]",
-        "        Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]] [t2_int:UInt32;N, __always_true:Boolean, COUNT(UInt8(1)):Int64;N]",
-        "          TableScan: t2 projection=[t2_int] [t2_int:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    // assert data
-    let results = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-------+-----+",
-        "| t1_id | cnt |",
-        "+-------+-----+",
-        "| 33    | 3   |",
-        "| 22    | 0   |",
-        "| 11    | 1   |",
-        "| 44    | 0   |",
-        "+-------+-----+",
-    ];
-    assert_batches_sorted_eq!(expected, &results);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn correlated_scalar_subquery_count_agg_with_alias() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "SELECT t1_id, (SELECT count(*) as _cnt FROM t2 WHERE t2.t2_int = t1.t1_int) as cnt from t1";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    let expected = vec![
-        "Projection: t1.t1_id, CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) AS _cnt ELSE __scalar_sq_1._cnt END AS cnt [t1_id:UInt32;N, cnt:Int64;N]",
-        "  Left Join: t1.t1_int = __scalar_sq_1.t2_int [t1_id:UInt32;N, t1_int:UInt32;N, _cnt:Int64;N, t2_int:UInt32;N, __always_true:Boolean;N]",
-        "    TableScan: t1 projection=[t1_id, t1_int] [t1_id:UInt32;N, t1_int:UInt32;N]",
-        "    SubqueryAlias: __scalar_sq_1 [_cnt:Int64;N, t2_int:UInt32;N, __always_true:Boolean]",
-        "      Projection: COUNT(UInt8(1)) AS _cnt, t2.t2_int, __always_true [_cnt:Int64;N, t2_int:UInt32;N, __always_true:Boolean]",
-        "        Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]] [t2_int:UInt32;N, __always_true:Boolean, COUNT(UInt8(1)):Int64;N]",
-        "          TableScan: t2 projection=[t2_int] [t2_int:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    // assert data
-    let results = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-------+-----+",
-        "| t1_id | cnt |",
-        "+-------+-----+",
-        "| 33    | 3   |",
-        "| 22    | 0   |",
-        "| 11    | 1   |",
-        "| 44    | 0   |",
-        "+-------+-----+",
-    ];
-    assert_batches_sorted_eq!(expected, &results);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn correlated_scalar_subquery_count_agg_complex_expr() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "SELECT t1_id, (SELECT count(*) + 2 as _cnt FROM t2 WHERE t2.t2_int = t1.t1_int) from t1";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    let expected = vec![
-        "Projection: t1.t1_id, CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(2) AS _cnt ELSE __scalar_sq_1._cnt END AS _cnt [t1_id:UInt32;N, _cnt:Int64;N]",
-        "  Left Join: t1.t1_int = __scalar_sq_1.t2_int [t1_id:UInt32;N, t1_int:UInt32;N, _cnt:Int64;N, t2_int:UInt32;N, __always_true:Boolean;N]",
-        "    TableScan: t1 projection=[t1_id, t1_int] [t1_id:UInt32;N, t1_int:UInt32;N]",
-        "    SubqueryAlias: __scalar_sq_1 [_cnt:Int64;N, t2_int:UInt32;N, __always_true:Boolean]",
-        "      Projection: COUNT(UInt8(1)) + Int64(2) AS _cnt, t2.t2_int, __always_true [_cnt:Int64;N, t2_int:UInt32;N, __always_true:Boolean]",
-        "        Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]] [t2_int:UInt32;N, __always_true:Boolean, COUNT(UInt8(1)):Int64;N]",
-        "          TableScan: t2 projection=[t2_int] [t2_int:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    // assert data
-    let results = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-------+------+",
-        "| t1_id | _cnt |",
-        "+-------+------+",
-        "| 11    | 3    |",
-        "| 22    | 2    |",
-        "| 33    | 5    |",
-        "| 44    | 2    |",
-        "+-------+------+",
-    ];
-    assert_batches_sorted_eq!(expected, &results);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn correlated_scalar_subquery_count_agg_where_clause() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "select t1.t1_int from t1 where (select count(*) from t2 where t1.t1_id = t2.t2_id) < t1.t1_int";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    let expected = vec![
-        "Projection: t1.t1_int [t1_int:UInt32;N]",
-        "  Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_1.COUNT(UInt8(1)) END < CAST(t1.t1_int AS Int64) [t1_int:UInt32;N, COUNT(UInt8(1)):Int64;N, __always_true:Boolean;N]",
-        "    Projection: t1.t1_int, __scalar_sq_1.COUNT(UInt8(1)), __scalar_sq_1.__always_true [t1_int:UInt32;N, COUNT(UInt8(1)):Int64;N, __always_true:Boolean;N]",
-        "      Left Join: t1.t1_id = __scalar_sq_1.t2_id [t1_id:UInt32;N, t1_int:UInt32;N, COUNT(UInt8(1)):Int64;N, t2_id:UInt32;N, __always_true:Boolean;N]",
-        "        TableScan: t1 projection=[t1_id, t1_int] [t1_id:UInt32;N, t1_int:UInt32;N]",
-        "        SubqueryAlias: __scalar_sq_1 [COUNT(UInt8(1)):Int64;N, t2_id:UInt32;N, __always_true:Boolean]",
-        "          Projection: COUNT(UInt8(1)), t2.t2_id, __always_true [COUNT(UInt8(1)):Int64;N, t2_id:UInt32;N, __always_true:Boolean]",
-        "            Aggregate: groupBy=[[t2.t2_id, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]] [t2_id:UInt32;N, __always_true:Boolean, COUNT(UInt8(1)):Int64;N]",
-        "              TableScan: t2 projection=[t2_id] [t2_id:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    // assert data
-    let results = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+--------+",
-        "| t1_int |",
-        "+--------+",
-        "| 2      |",
-        "| 4      |",
-        "| 3      |",
-        "+--------+",
-    ];
-    assert_batches_sorted_eq!(expected, &results);
-
-    Ok(())
-}
 
 #[tokio::test]
 #[ignore]
@@ -1164,272 +61,3 @@ async fn correlated_scalar_subquery_sum_agg_bug() -> Result<()> {
 
     Ok(())
 }
-
-#[tokio::test]
-async fn correlated_scalar_subquery_count_agg_with_having() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "SELECT t1_id, (SELECT count(*) + 2 as cnt_plus_2 FROM t2 WHERE t2.t2_int = t1.t1_int having count(*) >1) from t1";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    // the having condition is kept as the normal filter condition, no need to pull up
-    let expected = vec![
-        "Projection: t1.t1_id, __scalar_sq_1.cnt_plus_2 AS cnt_plus_2 [t1_id:UInt32;N, cnt_plus_2:Int64;N]",
-        "  Left Join: t1.t1_int = __scalar_sq_1.t2_int [t1_id:UInt32;N, t1_int:UInt32;N, cnt_plus_2:Int64;N, t2_int:UInt32;N]",
-        "    TableScan: t1 projection=[t1_id, t1_int] [t1_id:UInt32;N, t1_int:UInt32;N]",
-        "    SubqueryAlias: __scalar_sq_1 [cnt_plus_2:Int64;N, t2_int:UInt32;N]",
-        "      Projection: COUNT(UInt8(1)) + Int64(2) AS cnt_plus_2, t2.t2_int [cnt_plus_2:Int64;N, t2_int:UInt32;N]",
-        "        Filter: COUNT(UInt8(1)) > Int64(1) [t2_int:UInt32;N, COUNT(UInt8(1)):Int64;N]",
-        "          Projection: t2.t2_int, COUNT(UInt8(1)) [t2_int:UInt32;N, COUNT(UInt8(1)):Int64;N]",
-        "            Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]] [t2_int:UInt32;N, __always_true:Boolean, COUNT(UInt8(1)):Int64;N]",
-        "              TableScan: t2 projection=[t2_int] [t2_int:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    // assert data
-    let results = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-------+------------+",
-        "| t1_id | cnt_plus_2 |",
-        "+-------+------------+",
-        "| 11    |            |",
-        "| 22    |            |",
-        "| 33    | 5          |",
-        "| 44    |            |",
-        "+-------+------------+",
-    ];
-    assert_batches_sorted_eq!(expected, &results);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn correlated_scalar_subquery_count_agg_with_pull_up_having() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "SELECT t1_id, (SELECT count(*) + 2 as cnt_plus_2 FROM t2 WHERE t2.t2_int = t1.t1_int having count(*) = 0) from t1";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    // the having condition need to pull up and evaluated after the left out join
-    let expected = vec![
-        "Projection: t1.t1_id, CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(2) AS cnt_plus_2 WHEN __scalar_sq_1.COUNT(UInt8(1)) != Int64(0) THEN NULL ELSE __scalar_sq_1.cnt_plus_2 END AS cnt_plus_2 [t1_id:UInt32;N, cnt_plus_2:Int64;N]",
-        "  Left Join: t1.t1_int = __scalar_sq_1.t2_int [t1_id:UInt32;N, t1_int:UInt32;N, cnt_plus_2:Int64;N, t2_int:UInt32;N, COUNT(UInt8(1)):Int64;N, __always_true:Boolean;N]",
-        "    TableScan: t1 projection=[t1_id, t1_int] [t1_id:UInt32;N, t1_int:UInt32;N]",
-        "    SubqueryAlias: __scalar_sq_1 [cnt_plus_2:Int64;N, t2_int:UInt32;N, COUNT(UInt8(1)):Int64;N, __always_true:Boolean]",
-        "      Projection: COUNT(UInt8(1)) + Int64(2) AS cnt_plus_2, t2.t2_int, COUNT(UInt8(1)), __always_true [cnt_plus_2:Int64;N, t2_int:UInt32;N, COUNT(UInt8(1)):Int64;N, __always_true:Boolean]",
-        "        Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]] [t2_int:UInt32;N, __always_true:Boolean, COUNT(UInt8(1)):Int64;N]",
-        "          TableScan: t2 projection=[t2_int] [t2_int:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    // assert data
-    let results = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-------+------------+",
-        "| t1_id | cnt_plus_2 |",
-        "+-------+------------+",
-        "| 11    |            |",
-        "| 22    | 2          |",
-        "| 33    |            |",
-        "| 44    | 2          |",
-        "+-------+------------+",
-    ];
-    assert_batches_sorted_eq!(expected, &results);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn correlated_scalar_subquery_count_agg_in_having() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "select t1.t1_int from t1 group by t1.t1_int having (select count(*) from t2 where t1.t1_int = t2.t2_int) = 0";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    let expected = vec![
-        "Projection: t1.t1_int [t1_int:UInt32;N]",
-        "  Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_1.COUNT(UInt8(1)) END = Int64(0) [t1_int:UInt32;N, COUNT(UInt8(1)):Int64;N, __always_true:Boolean;N]",
-        "    Projection: t1.t1_int, __scalar_sq_1.COUNT(UInt8(1)), __scalar_sq_1.__always_true [t1_int:UInt32;N, COUNT(UInt8(1)):Int64;N, __always_true:Boolean;N]",
-        "      Left Join: t1.t1_int = __scalar_sq_1.t2_int [t1_int:UInt32;N, COUNT(UInt8(1)):Int64;N, t2_int:UInt32;N, __always_true:Boolean;N]",
-        "        Aggregate: groupBy=[[t1.t1_int]], aggr=[[]] [t1_int:UInt32;N]",
-        "          TableScan: t1 projection=[t1_int] [t1_int:UInt32;N]",
-        "        SubqueryAlias: __scalar_sq_1 [COUNT(UInt8(1)):Int64;N, t2_int:UInt32;N, __always_true:Boolean]",
-        "          Projection: COUNT(UInt8(1)), t2.t2_int, __always_true [COUNT(UInt8(1)):Int64;N, t2_int:UInt32;N, __always_true:Boolean]",
-        "            Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]] [t2_int:UInt32;N, __always_true:Boolean, COUNT(UInt8(1)):Int64;N]",
-        "              TableScan: t2 projection=[t2_int] [t2_int:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    // assert data
-    let results = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+--------+",
-        "| t1_int |",
-        "+--------+",
-        "| 2      |",
-        "| 4      |",
-        "+--------+",
-    ];
-    assert_batches_sorted_eq!(expected, &results);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn correlated_scalar_subquery_count_agg_in_nested_projection() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "select t1.t1_int from t1 where (select cnt from (select count(*) as cnt, sum(t2_int) from t2 where t1.t1_int = t2.t2_int)) = 0";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    let expected = vec![
-        "Projection: t1.t1_int [t1_int:UInt32;N]",
-        "  Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_1.cnt END = Int64(0) [t1_int:UInt32;N, cnt:Int64;N, __always_true:Boolean;N]",
-        "    Projection: t1.t1_int, __scalar_sq_1.cnt, __scalar_sq_1.__always_true [t1_int:UInt32;N, cnt:Int64;N, __always_true:Boolean;N]",
-        "      Left Join: t1.t1_int = __scalar_sq_1.t2_int [t1_int:UInt32;N, cnt:Int64;N, t2_int:UInt32;N, __always_true:Boolean;N]",
-        "        TableScan: t1 projection=[t1_int] [t1_int:UInt32;N]",
-        "        SubqueryAlias: __scalar_sq_1 [cnt:Int64;N, t2_int:UInt32;N, __always_true:Boolean]",
-        "          Projection: COUNT(UInt8(1)) AS cnt, t2.t2_int, __always_true [cnt:Int64;N, t2_int:UInt32;N, __always_true:Boolean]",
-        "            Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]] [t2_int:UInt32;N, __always_true:Boolean, COUNT(UInt8(1)):Int64;N]",
-        "              TableScan: t2 projection=[t2_int] [t2_int:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    // assert data
-    let results = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+--------+",
-        "| t1_int |",
-        "+--------+",
-        "| 2      |",
-        "| 4      |",
-        "+--------+",
-    ];
-    assert_batches_sorted_eq!(expected, &results);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn correlated_scalar_subquery_count_agg_in_nested_subquery() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "select t1.t1_int from t1 where \
-                        (select cnt_plus_one + 1 as cnt_plus_two from \
-                            (select cnt + 1 as cnt_plus_one from \
-                                (select count(*) as cnt, sum(t2_int) s from t2 where t1.t1_int = t2.t2_int having cnt = 0)\
-                            )\
-                        ) = 2";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    // pull up the deeply nested having condition
-    let expected = vec![
-        "Projection: t1.t1_int [t1_int:UInt32;N]",
-        "  Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(2) WHEN __scalar_sq_1.COUNT(UInt8(1)) != Int64(0) THEN NULL ELSE __scalar_sq_1.cnt_plus_two END = Int64(2) [t1_int:UInt32;N, cnt_plus_two:Int64;N, COUNT(UInt8(1)):Int64;N, __always_true:Boolean;N]",
-        "    Projection: t1.t1_int, __scalar_sq_1.cnt_plus_two, __scalar_sq_1.COUNT(UInt8(1)), __scalar_sq_1.__always_true [t1_int:UInt32;N, cnt_plus_two:Int64;N, COUNT(UInt8(1)):Int64;N, __always_true:Boolean;N]",
-        "      Left Join: t1.t1_int = __scalar_sq_1.t2_int [t1_int:UInt32;N, cnt_plus_two:Int64;N, t2_int:UInt32;N, COUNT(UInt8(1)):Int64;N, __always_true:Boolean;N]",
-        "        TableScan: t1 projection=[t1_int] [t1_int:UInt32;N]",
-        "        SubqueryAlias: __scalar_sq_1 [cnt_plus_two:Int64;N, t2_int:UInt32;N, COUNT(UInt8(1)):Int64;N, __always_true:Boolean]",
-        "          Projection: COUNT(UInt8(1)) + Int64(1) + Int64(1) AS cnt_plus_two, t2.t2_int, COUNT(UInt8(1)), __always_true [cnt_plus_two:Int64;N, t2_int:UInt32;N, COUNT(UInt8(1)):Int64;N, __always_true:Boolean]",
-        "            Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]] [t2_int:UInt32;N, __always_true:Boolean, COUNT(UInt8(1)):Int64;N]",
-        "              TableScan: t2 projection=[t2_int] [t2_int:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    // assert data
-    let results = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+--------+",
-        "| t1_int |",
-        "+--------+",
-        "| 2      |",
-        "| 4      |",
-        "+--------+",
-    ];
-    assert_batches_sorted_eq!(expected, &results);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn correlated_scalar_subquery_count_agg_in_case_when() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "select t1.t1_int from t1 where \
-                    (select case when count(*) = 1 then null else count(*) end as cnt from t2 where t2.t2_int = t1.t1_int)\
-                    = 0";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    let expected = vec![
-        "Projection: t1.t1_int [t1_int:UInt32;N]",
-        "  Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_1.cnt END = Int64(0) [t1_int:UInt32;N, cnt:Int64;N, __always_true:Boolean;N]",
-        "    Projection: t1.t1_int, __scalar_sq_1.cnt, __scalar_sq_1.__always_true [t1_int:UInt32;N, cnt:Int64;N, __always_true:Boolean;N]",
-        "      Left Join: t1.t1_int = __scalar_sq_1.t2_int [t1_int:UInt32;N, cnt:Int64;N, t2_int:UInt32;N, __always_true:Boolean;N]",
-        "        TableScan: t1 projection=[t1_int] [t1_int:UInt32;N]",
-        "        SubqueryAlias: __scalar_sq_1 [cnt:Int64;N, t2_int:UInt32;N, __always_true:Boolean]",
-        "          Projection: CASE WHEN COUNT(UInt8(1)) = Int64(1) THEN Int64(NULL) ELSE COUNT(UInt8(1)) END AS cnt, t2.t2_int, __always_true [cnt:Int64;N, t2_int:UInt32;N, __always_true:Boolean]",
-        "            Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]] [t2_int:UInt32;N, __always_true:Boolean, COUNT(UInt8(1)):Int64;N]",
-        "              TableScan: t2 projection=[t2_int] [t2_int:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    // assert data
-    let results = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+--------+",
-        "| t1_int |",
-        "+--------+",
-        "| 2      |",
-        "| 4      |",
-        "+--------+",
-    ];
-    assert_batches_sorted_eq!(expected, &results);
-
-    Ok(())
-}
diff --git a/datafusion/core/tests/sqllogictests/test_files/subquery.slt b/datafusion/core/tests/sqllogictests/test_files/subquery.slt
index b47901b277..1961a5840d 100644
--- a/datafusion/core/tests/sqllogictests/test_files/subquery.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/subquery.slt
@@ -19,7 +19,18 @@
 ## Subquery Tests
 #############
 
-# two tables for subquery
+
+#############
+## Setup test data table
+#############
+# there tables for subquery
+statement ok
+CREATE TABLE t0(t0_id INT, t0_name TEXT, t0_int INT) AS VALUES
+(11, 'o', 6),
+(22, 'p', 7),
+(33, 'q', 8),
+(44, 'r', 9);
+
 statement ok
 CREATE TABLE t1(t1_id INT, t1_name TEXT, t1_int INT) AS VALUES
 (11, 'a', 1),
@@ -34,6 +45,50 @@ CREATE TABLE t2(t2_id INT, t2_name TEXT, t2_int INT) AS VALUES
 (44, 'x', 3),
 (55, 'w', 3);
 
+statement ok
+CREATE EXTERNAL TABLE IF NOT EXISTS customer (
+        c_custkey BIGINT,
+        c_name VARCHAR,
+        c_address VARCHAR,
+        c_nationkey BIGINT,
+        c_phone VARCHAR,
+        c_acctbal DECIMAL(15, 2),
+        c_mktsegment VARCHAR,
+        c_comment VARCHAR,
+) STORED AS CSV DELIMITER ',' WITH HEADER ROW LOCATION 'tests/tpch-csv/customer.csv';
+
+statement ok
+CREATE EXTERNAL TABLE IF NOT EXISTS orders (
+        o_orderkey BIGINT,
+        o_custkey BIGINT,
+        o_orderstatus VARCHAR,
+        o_totalprice DECIMAL(15, 2),
+        o_orderdate DATE,
+        o_orderpriority VARCHAR,
+        o_clerk VARCHAR,
+        o_shippriority INTEGER,
+        o_comment VARCHAR,
+) STORED AS CSV DELIMITER ',' WITH HEADER ROW LOCATION 'tests/tpch-csv/orders.csv';
+
+statement ok
+CREATE EXTERNAL TABLE IF NOT EXISTS lineitem (
+        l_orderkey BIGINT,
+        l_partkey BIGINT,
+        l_suppkey BIGINT,
+        l_linenumber INTEGER,
+        l_quantity DECIMAL(15, 2),
+        l_extendedprice DECIMAL(15, 2),
+        l_discount DECIMAL(15, 2),
+        l_tax DECIMAL(15, 2),
+        l_returnflag VARCHAR,
+        l_linestatus VARCHAR,
+        l_shipdate DATE,
+        l_commitdate DATE,
+        l_receiptdate DATE,
+        l_shipinstruct VARCHAR,
+        l_shipmode VARCHAR,
+        l_comment VARCHAR,
+) STORED AS CSV DELIMITER ',' WITH HEADER ROW LOCATION 'tests/tpch-csv/lineitem.csv';
 
 # in_subquery_to_join_with_correlated_outer_filter
 query ITI rowsort
@@ -248,3 +303,670 @@ SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id having sum(t
 22 1
 33 NULL
 44 NULL
+
+
+statement ok
+set datafusion.explain.logical_plan_only = true;
+
+# correlated_recursive_scalar_subquery
+query TT
+explain select c_custkey from customer
+where c_acctbal < (
+    select sum(o_totalprice) from orders
+    where o_custkey = c_custkey
+    and o_totalprice < (
+            select sum(l_extendedprice) as price from lineitem where l_orderkey = o_orderkey
+    )
+) order by c_custkey;
+----
+logical_plan
+Sort: customer.c_custkey ASC NULLS LAST
+--Projection: customer.c_custkey
+----Inner Join: customer.c_custkey = __scalar_sq_10.o_custkey Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_10.SUM(orders.o_totalprice)
+------TableScan: customer projection=[c_custkey, c_acctbal]
+------SubqueryAlias: __scalar_sq_10
+--------Projection: SUM(orders.o_totalprice), orders.o_custkey
+----------Aggregate: groupBy=[[orders.o_custkey]], aggr=[[SUM(orders.o_totalprice)]]
+------------Projection: orders.o_custkey, orders.o_totalprice
+--------------Inner Join: orders.o_orderkey = __scalar_sq_11.l_orderkey Filter: CAST(orders.o_totalprice AS Decimal128(25, 2)) < __scalar_sq_11.price
+----------------TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice]
+----------------SubqueryAlias: __scalar_sq_11
+------------------Projection: SUM(lineitem.l_extendedprice) AS price, lineitem.l_orderkey
+--------------------Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_extendedprice)]]
+----------------------TableScan: lineitem projection=[l_orderkey, l_extendedprice]
+
+# correlated_where_in
+query TT
+explain select o_orderkey from orders
+where o_orderstatus in (
+    select l_linestatus from lineitem where l_orderkey = orders.o_orderkey
+);
+----
+logical_plan
+Projection: orders.o_orderkey
+--LeftSemi Join: orders.o_orderstatus = __correlated_sq_6.l_linestatus, orders.o_orderkey = __correlated_sq_6.l_orderkey
+----TableScan: orders projection=[o_orderkey, o_orderstatus]
+----SubqueryAlias: __correlated_sq_6
+------Projection: lineitem.l_linestatus, lineitem.l_orderkey
+--------TableScan: lineitem projection=[l_orderkey, l_linestatus]
+
+query I rowsort
+select o_orderkey from orders
+where o_orderstatus in (
+    select l_linestatus from lineitem where l_orderkey = orders.o_orderkey
+);
+----
+2
+3
+
+#exists_subquery_with_same_table
+#Subquery and outer query refer to the same table.
+#It will not be rewritten to join because it is not a correlated subquery.
+query TT
+explain SELECT t1_id, t1_name, t1_int FROM t1 WHERE EXISTS(SELECT t1_int FROM t1 WHERE t1.t1_id > t1.t1_int)
+----
+logical_plan
+Filter: EXISTS (<subquery>)
+--Subquery:
+----Projection: t1.t1_int
+------Filter: t1.t1_id > t1.t1_int
+--------TableScan: t1
+--TableScan: t1 projection=[t1_id, t1_name, t1_int]
+
+
+#in_subquery_with_same_table
+#Subquery and outer query refer to the same table.
+#It will be rewritten to join because in-subquery has extra predicate(`t1.t1_id = __correlated_sq_10.t1_int`).
+query TT
+explain SELECT t1_id, t1_name, t1_int FROM t1 WHERE t1_id IN(SELECT t1_int FROM t1 WHERE t1.t1_id > t1.t1_int)
+----
+logical_plan
+LeftSemi Join: t1.t1_id = __correlated_sq_10.t1_int
+--TableScan: t1 projection=[t1_id, t1_name, t1_int]
+--SubqueryAlias: __correlated_sq_10
+----Projection: t1.t1_int
+------Filter: t1.t1_id > t1.t1_int
+--------TableScan: t1 projection=[t1_id, t1_int]
+
+#in_subquery_nested_exist_subquery
+query TT
+explain SELECT t1_id, t1_name, t1_int FROM t1 WHERE t1_id IN(SELECT t2_id FROM t2 WHERE EXISTS(select * from t1 WHERE t1.t1_int > t2.t2_int))
+----
+logical_plan
+LeftSemi Join: t1.t1_id = __correlated_sq_11.t2_id
+--TableScan: t1 projection=[t1_id, t1_name, t1_int]
+--SubqueryAlias: __correlated_sq_11
+----Projection: t2.t2_id
+------LeftSemi Join:  Filter: __correlated_sq_12.t1_int > t2.t2_int
+--------TableScan: t2 projection=[t2_id, t2_int]
+--------SubqueryAlias: __correlated_sq_12
+----------TableScan: t1 projection=[t1_int]
+
+#invalid_scalar_subquery
+statement error DataFusion error: check_analyzed_plan\ncaused by\nError during planning: Scalar subquery should only return one column, but found 2: t2.t2_id, t2.t2_name
+SELECT t1_id, t1_name, t1_int, (select t2_id, t2_name FROM t2 WHERE t2.t2_id = t1.t1_int) FROM t1
+
+#subquery_not_allowed
+#In/Exist Subquery is not allowed in ORDER BY clause.
+statement error DataFusion error: check_analyzed_plan\ncaused by\nError during planning: In/Exist subquery can only be used in Projection, Filter, Window functions, Aggregate and Join plan nodes
+SELECT t1_id, t1_name, t1_int FROM t1 order by t1_int in (SELECT t2_int FROM t2 WHERE t1.t1_id > t1.t1_int)
+
+#non_aggregated_correlated_scalar_subquery
+statement error DataFusion error: check_analyzed_plan\ncaused by\nError during planning: Correlated scalar subquery must be aggregated to return at most one row
+SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1.t1_int) as t2_int from t1
+
+statement error DataFusion error: check_analyzed_plan\ncaused by\nError during planning: Correlated scalar subquery must be aggregated to return at most one row
+SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1_int group by t2_int) as t2_int from t1
+
+#non_aggregated_correlated_scalar_subquery_with_limit
+statement error DataFusion error: check_analyzed_plan\ncaused by\nError during planning: Correlated scalar subquery must be aggregated to return at most one row
+SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1.t1_int limit 2) as t2_int from t1
+
+#non_aggregated_correlated_scalar_subquery_with_single_row
+query TT
+explain SELECT t1_id, (SELECT t2_int FROM t2 WHERE t2.t2_int = t1.t1_int limit 1) as t2_int from t1
+----
+logical_plan
+Projection: t1.t1_id, (<subquery>) AS t2_int
+--Subquery:
+----Limit: skip=0, fetch=1
+------Projection: t2.t2_int
+--------Filter: t2.t2_int = outer_ref(t1.t1_int)
+----------TableScan: t2
+--TableScan: t1 projection=[t1_id]
+
+query TT
+explain SELECT t1_id from t1 where t1_int = (SELECT t2_int FROM t2 WHERE t2.t2_int = t1.t1_int limit 1)
+----
+logical_plan
+Projection: t1.t1_id
+--Filter: t1.t1_int = (<subquery>)
+----Subquery:
+------Limit: skip=0, fetch=1
+--------Projection: t2.t2_int
+----------Filter: t2.t2_int = outer_ref(t1.t1_int)
+------------TableScan: t2
+----TableScan: t1 projection=[t1_id, t1_int]
+
+query TT
+explain SELECT t1_id, (SELECT a FROM (select 1 as a) WHERE a = t1.t1_int) as t2_int from t1
+----
+logical_plan
+Projection: t1.t1_id, __scalar_sq_16.a AS t2_int
+--Left Join: CAST(t1.t1_int AS Int64) = __scalar_sq_16.a
+----TableScan: t1 projection=[t1_id, t1_int]
+----SubqueryAlias: __scalar_sq_16
+------Projection: Int64(1) AS a
+--------EmptyRelation
+
+query II rowsort
+SELECT t1_id, (SELECT a FROM (select 1 as a) WHERE a = t1.t1_int) as t2_int from t1
+----
+11 1
+22 NULL
+33 NULL
+44 NULL
+
+#non_equal_correlated_scalar_subquery
+statement error DataFusion error: check_analyzed_plan\ncaused by\nError during planning: Correlated column is not allowed in predicate: t2\.t2_id < outer_ref\(t1\.t1_id\)
+SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id < t1.t1_id) as t2_sum from t1
+
+#aggregated_correlated_scalar_subquery_with_extra_group_by_columns
+statement error DataFusion error: check_analyzed_plan\ncaused by\nError during planning: A GROUP BY clause in a scalar correlated subquery cannot contain non-correlated columns
+SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id group by t2_name) as t2_sum from t1
+
+#support_agg_correlated_columns
+query TT
+explain SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT sum(t1.t1_int + t2.t2_id) FROM t2 WHERE t1.t1_name = t2.t2_name)
+----
+logical_plan
+Filter: EXISTS (<subquery>)
+--Subquery:
+----Projection: SUM(outer_ref(t1.t1_int) + t2.t2_id)
+------Aggregate: groupBy=[[]], aggr=[[SUM(outer_ref(t1.t1_int) + t2.t2_id)]]
+--------Filter: outer_ref(t1.t1_name) = t2.t2_name
+----------TableScan: t2
+--TableScan: t1 projection=[t1_id, t1_name]
+
+#support_agg_correlated_columns2
+query TT
+explain SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT count(*) FROM t2 WHERE t1.t1_name = t2.t2_name having sum(t1_int + t2_id) >0)
+----
+logical_plan
+Filter: EXISTS (<subquery>)
+--Subquery:
+----Projection: COUNT(UInt8(1))
+------Filter: SUM(outer_ref(t1.t1_int) + t2.t2_id) > Int64(0)
+--------Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)), SUM(outer_ref(t1.t1_int) + t2.t2_id)]]
+----------Filter: outer_ref(t1.t1_name) = t2.t2_name
+------------TableScan: t2
+--TableScan: t1 projection=[t1_id, t1_name]
+
+#support_join_correlated_columns
+query TT
+explain SELECT t0_id, t0_name FROM t0 WHERE EXISTS (SELECT 1 FROM t1 INNER JOIN t2 ON(t1.t1_id = t2.t2_id and t1.t1_name = t0.t0_name))
+----
+logical_plan
+Filter: EXISTS (<subquery>)
+--Subquery:
+----Projection: Int64(1)
+------Inner Join:  Filter: t1.t1_id = t2.t2_id AND t1.t1_name = outer_ref(t0.t0_name)
+--------TableScan: t1
+--------TableScan: t2
+--TableScan: t0 projection=[t0_id, t0_name]
+
+#subquery_contains_join_contains_correlated_columns
+query TT
+explain SELECT t0_id, t0_name FROM t0 WHERE EXISTS (SELECT 1 FROM t1 INNER JOIN (select * from t2 where t2.t2_name = t0.t0_name) as t2 ON(t1.t1_id = t2.t2_id ))
+----
+logical_plan
+LeftSemi Join: t0.t0_name = __correlated_sq_19.t2_name
+--TableScan: t0 projection=[t0_id, t0_name]
+--SubqueryAlias: __correlated_sq_19
+----Projection: t2.t2_name
+------Inner Join: t1.t1_id = t2.t2_id
+--------TableScan: t1 projection=[t1_id]
+--------SubqueryAlias: t2
+----------TableScan: t2 projection=[t2_id, t2_name]
+
+#subquery_contains_join_contains_sub_query_alias_correlated_columns
+query TT
+explain SELECT t0_id, t0_name FROM t0 WHERE EXISTS (select 1 from (SELECT * FROM t1 where t1.t1_id = t0.t0_id) as x INNER JOIN (select * from t2 where t2.t2_name = t0.t0_name) as y ON(x.t1_id = y.t2_id))
+----
+logical_plan
+LeftSemi Join: t0.t0_id = __correlated_sq_20.t1_id, t0.t0_name = __correlated_sq_20.t2_name
+--TableScan: t0 projection=[t0_id, t0_name]
+--SubqueryAlias: __correlated_sq_20
+----Projection: x.t1_id, y.t2_name
+------Inner Join: x.t1_id = y.t2_id
+--------SubqueryAlias: x
+----------TableScan: t1 projection=[t1_id]
+--------SubqueryAlias: y
+----------TableScan: t2 projection=[t2_id, t2_name]
+
+#support_order_by_correlated_columns
+query TT
+explain SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT * FROM t2 WHERE t2_id >= t1_id order by t1_id)
+----
+logical_plan
+Filter: EXISTS (<subquery>)
+--Subquery:
+----Sort: outer_ref(t1.t1_id) ASC NULLS LAST
+------Projection: t2.t2_id, t2.t2_name, t2.t2_int
+--------Filter: t2.t2_id >= outer_ref(t1.t1_id)
+----------TableScan: t2
+--TableScan: t1 projection=[t1_id, t1_name]
+
+#exists_subquery_with_select_null
+query TT
+explain SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT NULL)
+----
+logical_plan
+Filter: EXISTS (<subquery>)
+--Subquery:
+----Projection: NULL
+------EmptyRelation
+--TableScan: t1 projection=[t1_id, t1_name]
+
+#exists_subquery_with_limit
+#de-correlated, limit is removed
+query TT
+explain SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT * FROM t2 WHERE t2_id = t1_id limit 1)
+----
+logical_plan
+LeftSemi Join: t1.t1_id = __correlated_sq_25.t2_id
+--TableScan: t1 projection=[t1_id, t1_name]
+--SubqueryAlias: __correlated_sq_25
+----TableScan: t2 projection=[t2_id]
+
+query IT rowsort
+SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT * FROM t2 WHERE t2_id = t1_id limit 1)
+----
+11 a
+22 b
+44 d
+
+#exists_subquery_with_limit0
+#de-correlated, limit is removed and replaced with EmptyRelation
+query TT
+explain SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT * FROM t2 WHERE t2_id = t1_id limit 0)
+----
+logical_plan
+LeftSemi Join: t1.t1_id = __correlated_sq_27.t2_id
+--TableScan: t1 projection=[t1_id, t1_name]
+--EmptyRelation
+
+query IT rowsort
+SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT * FROM t2 WHERE t2_id = t1_id limit 0)
+----
+
+
+#not_exists_subquery_with_limit0
+#de-correlated, limit is removed and replaced with EmptyRelation
+query TT
+explain SELECT t1_id, t1_name FROM t1 WHERE NOT EXISTS (SELECT * FROM t2 WHERE t2_id = t1_id limit 0)
+----
+logical_plan
+LeftAnti Join: t1.t1_id = __correlated_sq_29.t2_id
+--TableScan: t1 projection=[t1_id, t1_name]
+--EmptyRelation
+
+query IT rowsort
+SELECT t1_id, t1_name FROM t1 WHERE NOT EXISTS (SELECT * FROM t2 WHERE t2_id = t1_id limit 0)
+----
+11 a
+22 b
+33 c
+44 d
+
+#in_correlated_subquery_with_limit
+#not de-correlated
+query TT
+explain SELECT t1_id, t1_name FROM t1 WHERE t1_id in (SELECT t2_id FROM t2 where t1_name = t2_name limit 10)
+----
+logical_plan
+Filter: t1.t1_id IN (<subquery>)
+--Subquery:
+----Limit: skip=0, fetch=10
+------Projection: t2.t2_id
+--------Filter: outer_ref(t1.t1_name) = t2.t2_name
+----------TableScan: t2
+--TableScan: t1 projection=[t1_id, t1_name]
+
+#in_non_correlated_subquery_with_limit
+#de-correlated, limit is kept
+query TT
+explain SELECT t1_id, t1_name FROM t1 WHERE t1_id in (SELECT t2_id FROM t2 limit 10)
+----
+logical_plan
+LeftSemi Join: t1.t1_id = __correlated_sq_33.t2_id
+--TableScan: t1 projection=[t1_id, t1_name]
+--SubqueryAlias: __correlated_sq_33
+----Limit: skip=0, fetch=10
+------TableScan: t2 projection=[t2_id], fetch=10
+
+
+#uncorrelated_scalar_subquery_with_limit0
+query TT
+explain SELECT t1_id, (SELECT t2_id FROM t2 limit 0) FROM t1
+----
+logical_plan
+Projection: t1.t1_id, __scalar_sq_18.t2_id AS t2_id
+--Left Join:
+----TableScan: t1 projection=[t1_id]
+----EmptyRelation
+
+query II rowsort
+SELECT t1_id, (SELECT t2_id FROM t2 limit 0) FROM t1
+----
+11 NULL
+22 NULL
+33 NULL
+44 NULL
+
+#support_union_subquery
+query TT
+explain SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT * FROM t2 WHERE t2_id = t1_id UNION ALL SELECT * FROM t2 WHERE upper(t2_name) = upper(t1.t1_name))
+----
+logical_plan
+Filter: EXISTS (<subquery>)
+--Subquery:
+----Union
+------Projection: t2.t2_id, t2.t2_name, t2.t2_int
+--------Filter: t2.t2_id = outer_ref(t1.t1_id)
+----------TableScan: t2
+------Projection: t2.t2_id, t2.t2_name, t2.t2_int
+--------Filter: upper(t2.t2_name) = upper(outer_ref(t1.t1_name))
+----------TableScan: t2
+--TableScan: t1 projection=[t1_id, t1_name]
+
+#simple_uncorrelated_scalar_subquery
+query TT
+explain select (select count(*) from t1) as b
+----
+logical_plan
+Projection: __scalar_sq_20.COUNT(UInt8(1)) AS b
+--SubqueryAlias: __scalar_sq_20
+----Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]
+------TableScan: t1 projection=[t1_id]
+
+#simple_uncorrelated_scalar_subquery2
+query TT
+explain select (select count(*) from t1) as b, (select count(1) from t2)
+----
+logical_plan
+Projection: __scalar_sq_21.COUNT(UInt8(1)) AS b, __scalar_sq_22.COUNT(Int64(1)) AS COUNT(Int64(1))
+--Left Join:
+----SubqueryAlias: __scalar_sq_21
+------Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]
+--------TableScan: t1 projection=[t1_id]
+----SubqueryAlias: __scalar_sq_22
+------Aggregate: groupBy=[[]], aggr=[[COUNT(Int64(1))]]
+--------TableScan: t2 projection=[t2_id]
+
+query II
+select (select count(*) from t1) as b, (select count(1) from t2)
+----
+4 4
+
+#correlated_scalar_subquery_count_agg
+query TT
+explain SELECT t1_id, (SELECT count(*) FROM t2 WHERE t2.t2_int = t1.t1_int) from t1
+----
+logical_plan
+Projection: t1.t1_id, CASE WHEN __scalar_sq_25.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_25.COUNT(UInt8(1)) END AS COUNT(UInt8(1))
+--Left Join: t1.t1_int = __scalar_sq_25.t2_int
+----TableScan: t1 projection=[t1_id, t1_int]
+----SubqueryAlias: __scalar_sq_25
+------Projection: COUNT(UInt8(1)), t2.t2_int, __always_true
+--------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]]
+----------TableScan: t2 projection=[t2_int]
+
+query II rowsort
+SELECT t1_id, (SELECT count(*) FROM t2 WHERE t2.t2_int = t1.t1_int) from t1
+----
+11 1
+22 0
+33 3
+44 0
+
+
+#correlated_scalar_subquery_count_agg2
+query TT
+explain SELECT t1_id, (SELECT count(*) FROM t2 WHERE t2.t2_int = t1.t1_int) as cnt from t1
+----
+logical_plan
+Projection: t1.t1_id, CASE WHEN __scalar_sq_27.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_27.COUNT(UInt8(1)) END AS cnt
+--Left Join: t1.t1_int = __scalar_sq_27.t2_int
+----TableScan: t1 projection=[t1_id, t1_int]
+----SubqueryAlias: __scalar_sq_27
+------Projection: COUNT(UInt8(1)), t2.t2_int, __always_true
+--------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]]
+----------TableScan: t2 projection=[t2_int]
+
+query II rowsort
+SELECT t1_id, (SELECT count(*) FROM t2 WHERE t2.t2_int = t1.t1_int) as cnt from t1
+----
+11 1
+22 0
+33 3
+44 0
+
+#correlated_scalar_subquery_count_agg_with_alias
+query TT
+explain SELECT t1_id, (SELECT count(*) as _cnt FROM t2 WHERE t2.t2_int = t1.t1_int) as cnt from t1
+----
+logical_plan
+Projection: t1.t1_id, CASE WHEN __scalar_sq_29.__always_true IS NULL THEN Int64(0) AS _cnt ELSE __scalar_sq_29._cnt END AS cnt
+--Left Join: t1.t1_int = __scalar_sq_29.t2_int
+----TableScan: t1 projection=[t1_id, t1_int]
+----SubqueryAlias: __scalar_sq_29
+------Projection: COUNT(UInt8(1)) AS _cnt, t2.t2_int, __always_true
+--------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]]
+----------TableScan: t2 projection=[t2_int]
+
+query II rowsort
+SELECT t1_id, (SELECT count(*) as _cnt FROM t2 WHERE t2.t2_int = t1.t1_int) as cnt from t1
+----
+11 1
+22 0
+33 3
+44 0
+
+#correlated_scalar_subquery_count_agg_complex_expr
+query TT
+explain SELECT t1_id, (SELECT count(*) + 2 as _cnt FROM t2 WHERE t2.t2_int = t1.t1_int) from t1
+----
+logical_plan
+Projection: t1.t1_id, CASE WHEN __scalar_sq_31.__always_true IS NULL THEN Int64(2) AS _cnt ELSE __scalar_sq_31._cnt END AS _cnt
+--Left Join: t1.t1_int = __scalar_sq_31.t2_int
+----TableScan: t1 projection=[t1_id, t1_int]
+----SubqueryAlias: __scalar_sq_31
+------Projection: COUNT(UInt8(1)) + Int64(2) AS _cnt, t2.t2_int, __always_true
+--------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]]
+----------TableScan: t2 projection=[t2_int]
+
+query II rowsort
+SELECT t1_id, (SELECT count(*) + 2 as _cnt FROM t2 WHERE t2.t2_int = t1.t1_int) from t1
+----
+11 3
+22 2
+33 5
+44 2
+
+#correlated_scalar_subquery_count_agg_where_clause
+query TT
+explain select t1.t1_int from t1 where (select count(*) from t2 where t1.t1_id = t2.t2_id) < t1.t1_int
+----
+logical_plan
+Projection: t1.t1_int
+--Filter: CASE WHEN __scalar_sq_33.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_33.COUNT(UInt8(1)) END < CAST(t1.t1_int AS Int64)
+----Projection: t1.t1_int, __scalar_sq_33.COUNT(UInt8(1)), __scalar_sq_33.__always_true
+------Left Join: t1.t1_id = __scalar_sq_33.t2_id
+--------TableScan: t1 projection=[t1_id, t1_int]
+--------SubqueryAlias: __scalar_sq_33
+----------Projection: COUNT(UInt8(1)), t2.t2_id, __always_true
+------------Aggregate: groupBy=[[t2.t2_id, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]]
+--------------TableScan: t2 projection=[t2_id]
+
+query I rowsort
+select t1.t1_int from t1 where (select count(*) from t2 where t1.t1_id = t2.t2_id) < t1.t1_int
+----
+2
+3
+4
+
+#correlated_scalar_subquery_count_agg_with_having
+#the having condition is kept as the normal filter condition, no need to pull up
+query TT
+explain SELECT t1_id, (SELECT count(*) + 2 as cnt_plus_2 FROM t2 WHERE t2.t2_int = t1.t1_int having count(*) >1) from t1
+----
+logical_plan
+Projection: t1.t1_id, __scalar_sq_35.cnt_plus_2 AS cnt_plus_2
+--Left Join: t1.t1_int = __scalar_sq_35.t2_int
+----TableScan: t1 projection=[t1_id, t1_int]
+----SubqueryAlias: __scalar_sq_35
+------Projection: COUNT(UInt8(1)) + Int64(2) AS cnt_plus_2, t2.t2_int
+--------Filter: COUNT(UInt8(1)) > Int64(1)
+----------Projection: t2.t2_int, COUNT(UInt8(1))
+------------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]]
+--------------TableScan: t2 projection=[t2_int]
+
+query II rowsort
+SELECT t1_id, (SELECT count(*) + 2 as cnt_plus_2 FROM t2 WHERE t2.t2_int = t1.t1_int having count(*) >1) from t1
+----
+11 NULL
+22 NULL
+33 5
+44 NULL
+
+#correlated_scalar_subquery_count_agg_with_pull_up_having
+#the having condition need to pull up and evaluated after the left out join
+query TT
+explain SELECT t1_id, (SELECT count(*) + 2 as cnt_plus_2 FROM t2 WHERE t2.t2_int = t1.t1_int having count(*) = 0) from t1
+----
+logical_plan
+Projection: t1.t1_id, CASE WHEN __scalar_sq_37.__always_true IS NULL THEN Int64(2) AS cnt_plus_2 WHEN __scalar_sq_37.COUNT(UInt8(1)) != Int64(0) THEN NULL ELSE __scalar_sq_37.cnt_plus_2 END AS cnt_plus_2
+--Left Join: t1.t1_int = __scalar_sq_37.t2_int
+----TableScan: t1 projection=[t1_id, t1_int]
+----SubqueryAlias: __scalar_sq_37
+------Projection: COUNT(UInt8(1)) + Int64(2) AS cnt_plus_2, t2.t2_int, COUNT(UInt8(1)), __always_true
+--------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]]
+----------TableScan: t2 projection=[t2_int]
+
+query II rowsort
+SELECT t1_id, (SELECT count(*) + 2 as cnt_plus_2 FROM t2 WHERE t2.t2_int = t1.t1_int having count(*) = 0) from t1
+----
+11 NULL
+22 2
+33 NULL
+44 2
+
+#correlated_scalar_subquery_count_agg_in_having
+query TT
+explain select t1.t1_int from t1 group by t1.t1_int having (select count(*) from t2 where t1.t1_int = t2.t2_int) = 0
+----
+logical_plan
+Projection: t1.t1_int
+--Filter: CASE WHEN __scalar_sq_39.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_39.COUNT(UInt8(1)) END = Int64(0)
+----Projection: t1.t1_int, __scalar_sq_39.COUNT(UInt8(1)), __scalar_sq_39.__always_true
+------Left Join: t1.t1_int = __scalar_sq_39.t2_int
+--------Aggregate: groupBy=[[t1.t1_int]], aggr=[[]]
+----------TableScan: t1 projection=[t1_int]
+--------SubqueryAlias: __scalar_sq_39
+----------Projection: COUNT(UInt8(1)), t2.t2_int, __always_true
+------------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]]
+--------------TableScan: t2 projection=[t2_int]
+
+query I rowsort
+select t1.t1_int from t1 group by t1.t1_int having (select count(*) from t2 where t1.t1_int = t2.t2_int) = 0
+----
+2
+4
+
+#correlated_scalar_subquery_count_agg_in_nested_projection
+query TT
+explain select t1.t1_int from t1 where (select cnt from (select count(*) as cnt, sum(t2_int) from t2 where t1.t1_int = t2.t2_int)) = 0
+----
+logical_plan
+Projection: t1.t1_int
+--Filter: CASE WHEN __scalar_sq_41.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_41.cnt END = Int64(0)
+----Projection: t1.t1_int, __scalar_sq_41.cnt, __scalar_sq_41.__always_true
+------Left Join: t1.t1_int = __scalar_sq_41.t2_int
+--------TableScan: t1 projection=[t1_int]
+--------SubqueryAlias: __scalar_sq_41
+----------Projection: COUNT(UInt8(1)) AS cnt, t2.t2_int, __always_true
+------------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]]
+--------------TableScan: t2 projection=[t2_int]
+
+
+query I rowsort
+select t1.t1_int from t1 where (select cnt from (select count(*) as cnt, sum(t2_int) from t2 where t1.t1_int = t2.t2_int)) = 0
+----
+2
+4
+
+#correlated_scalar_subquery_count_agg_in_nested_subquery
+#pull up the deeply nested having condition
+query TT
+explain
+select t1.t1_int from t1 where (
+    select cnt_plus_one + 1 as cnt_plus_two from (
+        select cnt + 1 as cnt_plus_one from (
+            select count(*) as cnt, sum(t2_int) s from t2 where t1.t1_int = t2.t2_int having cnt = 0
+        )
+    )
+) = 2
+----
+logical_plan
+Projection: t1.t1_int
+--Filter: CASE WHEN __scalar_sq_43.__always_true IS NULL THEN Int64(2) WHEN __scalar_sq_43.COUNT(UInt8(1)) != Int64(0) THEN NULL ELSE __scalar_sq_43.cnt_plus_two END = Int64(2)
+----Projection: t1.t1_int, __scalar_sq_43.cnt_plus_two, __scalar_sq_43.COUNT(UInt8(1)), __scalar_sq_43.__always_true
+------Left Join: t1.t1_int = __scalar_sq_43.t2_int
+--------TableScan: t1 projection=[t1_int]
+--------SubqueryAlias: __scalar_sq_43
+----------Projection: COUNT(UInt8(1)) + Int64(1) + Int64(1) AS cnt_plus_two, t2.t2_int, COUNT(UInt8(1)), __always_true
+------------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]]
+--------------TableScan: t2 projection=[t2_int]
+
+query I rowsort
+select t1.t1_int from t1 where (
+    select cnt_plus_one + 1 as cnt_plus_two from (
+        select cnt + 1 as cnt_plus_one from (
+            select count(*) as cnt, sum(t2_int) s from t2 where t1.t1_int = t2.t2_int having cnt = 0
+        )
+    )
+) = 2
+----
+2
+4
+
+#correlated_scalar_subquery_count_agg_in_case_when
+query TT
+explain
+select t1.t1_int from t1 where
+       (select case when count(*) = 1 then null else count(*) end as cnt from t2 where t2.t2_int = t1.t1_int) = 0
+----
+logical_plan
+Projection: t1.t1_int
+--Filter: CASE WHEN __scalar_sq_45.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_45.cnt END = Int64(0)
+----Projection: t1.t1_int, __scalar_sq_45.cnt, __scalar_sq_45.__always_true
+------Left Join: t1.t1_int = __scalar_sq_45.t2_int
+--------TableScan: t1 projection=[t1_int]
+--------SubqueryAlias: __scalar_sq_45
+----------Projection: CASE WHEN COUNT(UInt8(1)) = Int64(1) THEN Int64(NULL) ELSE COUNT(UInt8(1)) END AS cnt, t2.t2_int, __always_true
+------------Aggregate: groupBy=[[t2.t2_int, Boolean(true) AS __always_true]], aggr=[[COUNT(UInt8(1))]]
+--------------TableScan: t2 projection=[t2_int]
+
+
+query I rowsort
+select t1.t1_int from t1 where
+       (select case when count(*) = 1 then null else count(*) end as cnt from t2 where t2.t2_int = t1.t1_int) = 0
+----
+2
+4
+
+
+
+
+