You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/12/30 14:24:35 UTC

[arrow-datafusion] branch master updated: Move subquery alias assignment onto rules (#4767)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new cf45eb902 Move subquery alias assignment onto rules (#4767)
cf45eb902 is described below

commit cf45eb9020092943b96653d70fafb143cc362e19
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Fri Dec 30 14:24:30 2022 +0000

    Move subquery alias assignment onto rules (#4767)
---
 benchmarks/expected-plans/q11.txt                  |   4 +-
 benchmarks/expected-plans/q15.txt                  |   4 +-
 benchmarks/expected-plans/q16.txt                  |   4 +-
 benchmarks/expected-plans/q17.txt                  |  13 +-
 benchmarks/expected-plans/q18.txt                  |   4 +-
 benchmarks/expected-plans/q2.txt                   |   4 +-
 benchmarks/expected-plans/q20.txt                  |  14 +-
 benchmarks/expected-plans/q22.txt                  |   4 +-
 benchmarks/src/bin/tpch.rs                         |  12 +-
 datafusion/core/tests/sql/subqueries.rs            | 226 ++++++++++-----------
 datafusion/optimizer/src/{lib.rs => alias.rs}      |  51 +++--
 datafusion/optimizer/src/decorrelate_where_in.rs   |  97 +++++----
 datafusion/optimizer/src/lib.rs                    |   1 +
 datafusion/optimizer/src/optimizer.rs              |  15 --
 .../optimizer/src/scalar_subquery_to_join.rs       |  73 +++----
 datafusion/optimizer/tests/integration-test.rs     |   4 +-
 16 files changed, 258 insertions(+), 272 deletions(-)

diff --git a/benchmarks/expected-plans/q11.txt b/benchmarks/expected-plans/q11.txt
index bb5493828..7d8e14548 100644
--- a/benchmarks/expected-plans/q11.txt
+++ b/benchmarks/expected-plans/q11.txt
@@ -1,6 +1,6 @@
 Sort: value DESC NULLS FIRST
   Projection: partsupp.ps_partkey, SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS value
-    Filter: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Decimal128(38, 15)) > CAST(__sq_1.__value AS Decimal128(38, 15))
+    Filter: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Decimal128(38, 15)) > CAST(__scalar_sq_1.__value AS Decimal128(38, 15))
       CrossJoin:
         Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]
           Inner Join: supplier.s_nationkey = nation.n_nationkey
@@ -9,7 +9,7 @@ Sort: value DESC NULLS FIRST
               TableScan: supplier projection=[s_suppkey, s_nationkey]
             Filter: nation.n_name = Utf8("GERMANY")
               TableScan: nation projection=[n_nationkey, n_name]
-        SubqueryAlias: __sq_1
+        SubqueryAlias: __scalar_sq_1
           Projection: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Float64) * Float64(0.0001) AS __value
             Aggregate: groupBy=[[]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]
               Inner Join: supplier.s_nationkey = nation.n_nationkey
diff --git a/benchmarks/expected-plans/q15.txt b/benchmarks/expected-plans/q15.txt
index 96401dd7b..f4e053f8d 100644
--- a/benchmarks/expected-plans/q15.txt
+++ b/benchmarks/expected-plans/q15.txt
@@ -1,7 +1,7 @@
 EmptyRelation
 Sort: supplier.s_suppkey ASC NULLS LAST
   Projection: supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_phone, revenue0.total_revenue
-    Inner Join: revenue0.total_revenue = __sq_1.__value
+    Inner Join: revenue0.total_revenue = __scalar_sq_1.__value
       Inner Join: supplier.s_suppkey = revenue0.supplier_no
         TableScan: supplier projection=[s_suppkey, s_name, s_address, s_phone]
         SubqueryAlias: revenue0
@@ -10,7 +10,7 @@ Sort: supplier.s_suppkey ASC NULLS LAST
               Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
                 Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587")
                   TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate]
-      SubqueryAlias: __sq_1
+      SubqueryAlias: __scalar_sq_1
         Projection: MAX(revenue0.total_revenue) AS __value
           Aggregate: groupBy=[[]], aggr=[[MAX(revenue0.total_revenue)]]
             SubqueryAlias: revenue0
diff --git a/benchmarks/expected-plans/q16.txt b/benchmarks/expected-plans/q16.txt
index 60ef26933..6af486a2a 100644
--- a/benchmarks/expected-plans/q16.txt
+++ b/benchmarks/expected-plans/q16.txt
@@ -3,12 +3,12 @@ Sort: supplier_cnt DESC NULLS FIRST, part.p_brand ASC NULLS LAST, part.p_type AS
     Projection: group_alias_0 AS part.p_brand, group_alias_1 AS part.p_type, group_alias_2 AS part.p_size, COUNT(alias1) AS COUNT(DISTINCT partsupp.ps_suppkey)
       Aggregate: groupBy=[[group_alias_0, group_alias_1, group_alias_2]], aggr=[[COUNT(alias1)]]
         Aggregate: groupBy=[[part.p_brand AS group_alias_0, part.p_type AS group_alias_1, part.p_size AS group_alias_2, partsupp.ps_suppkey AS alias1]], aggr=[[]]
-          LeftAnti Join: partsupp.ps_suppkey = __sq_1.s_suppkey
+          LeftAnti Join: partsupp.ps_suppkey = __correlated_sq_1.s_suppkey
             Inner Join: partsupp.ps_partkey = part.p_partkey
               TableScan: partsupp projection=[ps_partkey, ps_suppkey]
               Filter: part.p_brand != Utf8("Brand#45") AND part.p_type NOT LIKE Utf8("MEDIUM POLISHED%") AND part.p_size IN ([Int32(49), Int32(14), Int32(23), Int32(45), Int32(19), Int32(3), Int32(36), Int32(9)])
                 TableScan: part projection=[p_partkey, p_brand, p_type, p_size]
-            SubqueryAlias: __sq_1
+            SubqueryAlias: __correlated_sq_1
               Projection: supplier.s_suppkey AS s_suppkey
                 Filter: supplier.s_comment LIKE Utf8("%Customer%Complaints%")
                   TableScan: supplier projection=[s_suppkey, s_comment]
diff --git a/benchmarks/expected-plans/q17.txt b/benchmarks/expected-plans/q17.txt
index 17b8e9698..755311c5e 100644
--- a/benchmarks/expected-plans/q17.txt
+++ b/benchmarks/expected-plans/q17.txt
@@ -1,11 +1,12 @@
-Projection: CAST(SUM(lineitem.l_extendedprice) AS Decimal128(38, 33)) / Decimal128(Some(7000000000000000195487369212723200),38,33) AS avg_yearly
+Projection: CAST(SUM(lineitem.l_extendedprice) AS Float64) / Float64(7) AS avg_yearly
   Aggregate: groupBy=[[]], aggr=[[SUM(lineitem.l_extendedprice)]]
-    Filter: CAST(lineitem.l_quantity AS Decimal128(38, 21)) < __sq_1.__value
-      Inner Join: part.p_partkey = __sq_1.l_partkey
+    Filter: CAST(lineitem.l_quantity AS Decimal128(30, 15)) < CAST(__scalar_sq_1.__value AS Decimal128(30, 15))
+      Inner Join: part.p_partkey = __scalar_sq_1.l_partkey, lineitem.l_partkey = __scalar_sq_1.l_partkey
         Inner Join: lineitem.l_partkey = part.p_partkey
           TableScan: lineitem projection=[l_partkey, l_quantity, l_extendedprice]
           Filter: part.p_brand = Utf8("Brand#23") AND part.p_container = Utf8("MED BOX")
             TableScan: part projection=[p_partkey, p_brand, p_container]
-        Projection: lineitem.l_partkey, Decimal128(Some(200000000000000000000),38,21) * CAST(AVG(lineitem.l_quantity) AS Decimal128(38, 21)) AS __value, alias=__sq_1
-          Aggregate: groupBy=[[lineitem.l_partkey]], aggr=[[AVG(lineitem.l_quantity)]]
-            TableScan: lineitem projection=[l_partkey, l_quantity, l_extendedprice]
\ No newline at end of file
+        SubqueryAlias: __scalar_sq_1
+          Projection: lineitem.l_partkey, Float64(0.2) * CAST(AVG(lineitem.l_quantity) AS Float64) AS __value
+            Aggregate: groupBy=[[lineitem.l_partkey]], aggr=[[AVG(lineitem.l_quantity)]]
+              TableScan: lineitem projection=[l_partkey, l_quantity]
\ No newline at end of file
diff --git a/benchmarks/expected-plans/q18.txt b/benchmarks/expected-plans/q18.txt
index 4017722c5..639598725 100644
--- a/benchmarks/expected-plans/q18.txt
+++ b/benchmarks/expected-plans/q18.txt
@@ -1,13 +1,13 @@
 Sort: orders.o_totalprice DESC NULLS FIRST, orders.o_orderdate ASC NULLS LAST
   Projection: customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice, SUM(lineitem.l_quantity)
     Aggregate: groupBy=[[customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice]], aggr=[[SUM(lineitem.l_quantity)]]
-      LeftSemi Join: orders.o_orderkey = __sq_1.l_orderkey
+      LeftSemi Join: orders.o_orderkey = __correlated_sq_1.l_orderkey
         Inner Join: orders.o_orderkey = lineitem.l_orderkey
           Inner Join: customer.c_custkey = orders.o_custkey
             TableScan: customer projection=[c_custkey, c_name]
             TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice, o_orderdate]
           TableScan: lineitem projection=[l_orderkey, l_quantity]
-        SubqueryAlias: __sq_1
+        SubqueryAlias: __correlated_sq_1
           Projection: lineitem.l_orderkey AS l_orderkey
             Filter: SUM(lineitem.l_quantity) > Decimal128(Some(30000),25,2)
               Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_quantity)]]
diff --git a/benchmarks/expected-plans/q2.txt b/benchmarks/expected-plans/q2.txt
index 34fb1e09a..571c320e9 100644
--- a/benchmarks/expected-plans/q2.txt
+++ b/benchmarks/expected-plans/q2.txt
@@ -1,7 +1,7 @@
 Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name ASC NULLS LAST, supplier.s_name ASC NULLS LAST, part.p_partkey ASC NULLS LAST
   Projection: supplier.s_acctbal, supplier.s_name, nation.n_name, part.p_partkey, part.p_mfgr, supplier.s_address, supplier.s_phone, supplier.s_comment
     Projection: part.p_partkey, part.p_mfgr, supplier.s_name, supplier.s_address, supplier.s_phone, supplier.s_acctbal, supplier.s_comment, nation.n_name
-      Inner Join: part.p_partkey = __sq_1.ps_partkey, partsupp.ps_supplycost = __sq_1.__value
+      Inner Join: part.p_partkey = __scalar_sq_1.ps_partkey, partsupp.ps_supplycost = __scalar_sq_1.__value
         Inner Join: nation.n_regionkey = region.r_regionkey
           Inner Join: supplier.s_nationkey = nation.n_nationkey
             Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
@@ -13,7 +13,7 @@ Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name ASC NULLS LAST, supplie
             TableScan: nation projection=[n_nationkey, n_name, n_regionkey]
           Filter: region.r_name = Utf8("EUROPE")
             TableScan: region projection=[r_regionkey, r_name]
-        SubqueryAlias: __sq_1
+        SubqueryAlias: __scalar_sq_1
           Projection: partsupp.ps_partkey, MIN(partsupp.ps_supplycost) AS __value
             Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[MIN(partsupp.ps_supplycost)]]
               Inner Join: nation.n_regionkey = region.r_regionkey
diff --git a/benchmarks/expected-plans/q20.txt b/benchmarks/expected-plans/q20.txt
index b2676f61f..b7ecb9a09 100644
--- a/benchmarks/expected-plans/q20.txt
+++ b/benchmarks/expected-plans/q20.txt
@@ -1,21 +1,21 @@
 Sort: supplier.s_name ASC NULLS LAST
   Projection: supplier.s_name, supplier.s_address
-    LeftSemi Join: supplier.s_suppkey = __sq_1.ps_suppkey
+    LeftSemi Join: supplier.s_suppkey = __correlated_sq_1.ps_suppkey
       Inner Join: supplier.s_nationkey = nation.n_nationkey
         TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey]
         Filter: nation.n_name = Utf8("CANADA")
           TableScan: nation projection=[n_nationkey, n_name]
-      SubqueryAlias: __sq_1
+      SubqueryAlias: __correlated_sq_1
         Projection: partsupp.ps_suppkey AS ps_suppkey
-          Filter: CAST(partsupp.ps_availqty AS Float64) > __sq_3.__value
-            Inner Join: partsupp.ps_partkey = __sq_3.l_partkey, partsupp.ps_suppkey = __sq_3.l_suppkey
-              LeftSemi Join: partsupp.ps_partkey = __sq_2.p_partkey
+          Filter: CAST(partsupp.ps_availqty AS Float64) > __scalar_sq_1.__value
+            Inner Join: partsupp.ps_partkey = __scalar_sq_1.l_partkey, partsupp.ps_suppkey = __scalar_sq_1.l_suppkey
+              LeftSemi Join: partsupp.ps_partkey = __correlated_sq_2.p_partkey
                 TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty]
-                SubqueryAlias: __sq_2
+                SubqueryAlias: __correlated_sq_2
                   Projection: part.p_partkey AS p_partkey
                     Filter: part.p_name LIKE Utf8("forest%")
                       TableScan: part projection=[p_partkey, p_name]
-              SubqueryAlias: __sq_3
+              SubqueryAlias: __scalar_sq_1
                 Projection: lineitem.l_partkey, lineitem.l_suppkey, Float64(0.5) * CAST(SUM(lineitem.l_quantity) AS Float64) AS __value
                   Aggregate: groupBy=[[lineitem.l_partkey, lineitem.l_suppkey]], aggr=[[SUM(lineitem.l_quantity)]]
                     Filter: lineitem.l_shipdate >= Date32("8766") AND lineitem.l_shipdate < Date32("9131")
diff --git a/benchmarks/expected-plans/q22.txt b/benchmarks/expected-plans/q22.txt
index 82060bd59..0fd7a590a 100644
--- a/benchmarks/expected-plans/q22.txt
+++ b/benchmarks/expected-plans/q22.txt
@@ -3,13 +3,13 @@ Sort: custsale.cntrycode ASC NULLS LAST
     Aggregate: groupBy=[[custsale.cntrycode]], aggr=[[COUNT(UInt8(1)), SUM(custsale.c_acctbal)]]
       SubqueryAlias: custsale
         Projection: substr(customer.c_phone, Int64(1), Int64(2)) AS cntrycode, customer.c_acctbal
-          Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > __sq_1.__value
+          Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > __scalar_sq_1.__value
             CrossJoin:
               LeftAnti Join: customer.c_custkey = orders.o_custkey
                 Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])
                   TableScan: customer projection=[c_custkey, c_phone, c_acctbal]
                 TableScan: orders projection=[o_custkey]
-              SubqueryAlias: __sq_1
+              SubqueryAlias: __scalar_sq_1
                 Projection: AVG(customer.c_acctbal) AS __value
                   Aggregate: groupBy=[[]], aggr=[[AVG(customer.c_acctbal)]]
                     Filter: customer.c_acctbal > Decimal128(Some(0),15,2) AND substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 1b1bb876c..bff7999cd 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -594,17 +594,7 @@ mod tests {
         expected_plan(16).await
     }
 
-    /// This query produces different plans depending on operating system. The difference is
-    /// due to re-writing the following expression:
-    ///
-    /// `sum(l_extendedprice) / 7.0 as avg_yearly`
-    ///
-    /// Linux:   Decimal128(Some(7000000000000000195487369212723200),38,33)
-    /// Windows: Decimal128(Some(6999999999999999042565864605876224),38,33)
-    ///
-    /// See https://github.com/apache/arrow-datafusion/issues/3791
-    #[tokio::test]
-    #[ignore]
+    #[tokio::test]
     async fn q17_expected_plan() -> Result<()> {
         expected_plan(17).await
     }
diff --git a/datafusion/core/tests/sql/subqueries.rs b/datafusion/core/tests/sql/subqueries.rs
index 3a4dadd7f..3fff5ba3e 100644
--- a/datafusion/core/tests/sql/subqueries.rs
+++ b/datafusion/core/tests/sql/subqueries.rs
@@ -50,21 +50,21 @@ where c_acctbal < (
 
     let plan = dataframe.into_optimized_plan().unwrap();
     let actual = format!("{}", plan.display_indent());
-    let expected = "Sort: customer.c_custkey ASC NULLS LAST\
-    \n  Projection: customer.c_custkey\
-    \n    Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __sq_1.__value\
-    \n      Inner Join: customer.c_custkey = __sq_1.o_custkey\
-    \n        TableScan: customer projection=[c_custkey, c_acctbal]\
-    \n        SubqueryAlias: __sq_1\
-    \n          Projection: orders.o_custkey, SUM(orders.o_totalprice) AS __value\
-    \n            Aggregate: groupBy=[[orders.o_custkey]], aggr=[[SUM(orders.o_totalprice)]]\
-    \n              Filter: CAST(orders.o_totalprice AS Decimal128(25, 2)) < __sq_2.__value\
-    \n                Inner Join: orders.o_orderkey = __sq_2.l_orderkey\
-    \n                  TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice]\
-    \n                  SubqueryAlias: __sq_2\
-    \n                    Projection: lineitem.l_orderkey, SUM(lineitem.l_extendedprice) AS price AS __value\
-    \n                      Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_extendedprice)]]\
-    \n                        TableScan: lineitem projection=[l_orderkey, l_extendedprice]";
+    let expected = r#"Sort: customer.c_custkey ASC NULLS LAST
+  Projection: customer.c_custkey
+    Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_1.__value
+      Inner Join: customer.c_custkey = __scalar_sq_1.o_custkey
+        TableScan: customer projection=[c_custkey, c_acctbal]
+        SubqueryAlias: __scalar_sq_1
+          Projection: orders.o_custkey, SUM(orders.o_totalprice) AS __value
+            Aggregate: groupBy=[[orders.o_custkey]], aggr=[[SUM(orders.o_totalprice)]]
+              Filter: CAST(orders.o_totalprice AS Decimal128(25, 2)) < __scalar_sq_2.__value
+                Inner Join: orders.o_orderkey = __scalar_sq_2.l_orderkey
+                  TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice]
+                  SubqueryAlias: __scalar_sq_2
+                    Projection: lineitem.l_orderkey, SUM(lineitem.l_extendedprice) AS price AS __value
+                      Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_extendedprice)]]
+                        TableScan: lineitem projection=[l_orderkey, l_extendedprice]"#;
     assert_eq!(actual, expected);
 
     Ok(())
@@ -94,12 +94,12 @@ where o_orderstatus in (
     let dataframe = ctx.sql(sql).await.unwrap();
     let plan = dataframe.into_optimized_plan().unwrap();
     let actual = format!("{}", plan.display_indent());
-    let expected = "Projection: orders.o_orderkey\
-    \n  LeftSemi Join: orders.o_orderstatus = __sq_1.l_linestatus, orders.o_orderkey = __sq_1.l_orderkey\
-    \n    TableScan: orders projection=[o_orderkey, o_orderstatus]\
-    \n    SubqueryAlias: __sq_1\
-    \n      Projection: lineitem.l_linestatus AS l_linestatus, lineitem.l_orderkey AS l_orderkey\
-    \n        TableScan: lineitem projection=[l_orderkey, l_linestatus]";
+    let expected = r#"Projection: orders.o_orderkey
+  LeftSemi Join: orders.o_orderstatus = __correlated_sq_1.l_linestatus, orders.o_orderkey = __correlated_sq_1.l_orderkey
+    TableScan: orders projection=[o_orderkey, o_orderstatus]
+    SubqueryAlias: __correlated_sq_1
+      Projection: lineitem.l_linestatus AS l_linestatus, lineitem.l_orderkey AS l_orderkey
+        TableScan: lineitem projection=[l_orderkey, l_linestatus]"#;
     assert_eq!(actual, expected);
 
     // assert data
@@ -140,32 +140,32 @@ order by s_acctbal desc, n_name, s_name, p_partkey;"#;
     let dataframe = ctx.sql(sql).await.unwrap();
     let plan = dataframe.into_optimized_plan().unwrap();
     let actual = format!("{}", plan.display_indent());
-    let expected = "Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name ASC NULLS LAST, supplier.s_name ASC NULLS LAST, part.p_partkey ASC NULLS LAST\
-    \n  Projection: supplier.s_acctbal, supplier.s_name, nation.n_name, part.p_partkey, part.p_mfgr, supplier.s_address, supplier.s_phone, supplier.s_comment\
-    \n    Projection: part.p_partkey, part.p_mfgr, supplier.s_name, supplier.s_address, supplier.s_phone, supplier.s_acctbal, supplier.s_comment, nation.n_name\
-    \n      Inner Join: part.p_partkey = __sq_1.ps_partkey, partsupp.ps_supplycost = __sq_1.__value\
-    \n        Inner Join: nation.n_regionkey = region.r_regionkey\
-    \n          Inner Join: supplier.s_nationkey = nation.n_nationkey\
-    \n            Inner Join: partsupp.ps_suppkey = supplier.s_suppkey\
-    \n              Inner Join: part.p_partkey = partsupp.ps_partkey\
-    \n                Filter: part.p_size = Int32(15) AND part.p_type LIKE Utf8(\"%BRASS\")\
-    \n                  TableScan: part projection=[p_partkey, p_mfgr, p_type, p_size], partial_filters=[part.p_size = Int32(15), part.p_type LIKE Utf8(\"%BRASS\")]\
-    \n                TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost]\
-    \n              TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment]\
-    \n            TableScan: nation projection=[n_nationkey, n_name, n_regionkey]\
-    \n          Filter: region.r_name = Utf8(\"EUROPE\")\
-    \n            TableScan: region projection=[r_regionkey, r_name], partial_filters=[region.r_name = Utf8(\"EUROPE\")]\
-    \n        SubqueryAlias: __sq_1\
-    \n          Projection: partsupp.ps_partkey, MIN(partsupp.ps_supplycost) AS __value\
-    \n            Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[MIN(partsupp.ps_supplycost)]]\
-    \n              Inner Join: nation.n_regionkey = region.r_regionkey\
-    \n                Inner Join: supplier.s_nationkey = nation.n_nationkey\
-    \n                  Inner Join: partsupp.ps_suppkey = supplier.s_suppkey\
-    \n                    TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost]\
-    \n                    TableScan: supplier projection=[s_suppkey, s_nationkey]\
-    \n                  TableScan: nation projection=[n_nationkey, n_regionkey]\
-    \n                Filter: region.r_name = Utf8(\"EUROPE\")\
-    \n                  TableScan: region projection=[r_regionkey, r_name], partial_filters=[region.r_name = Utf8(\"EUROPE\")]";
+    let expected = r#"Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name ASC NULLS LAST, supplier.s_name ASC NULLS LAST, part.p_partkey ASC NULLS LAST
+  Projection: supplier.s_acctbal, supplier.s_name, nation.n_name, part.p_partkey, part.p_mfgr, supplier.s_address, supplier.s_phone, supplier.s_comment
+    Projection: part.p_partkey, part.p_mfgr, supplier.s_name, supplier.s_address, supplier.s_phone, supplier.s_acctbal, supplier.s_comment, nation.n_name
+      Inner Join: part.p_partkey = __scalar_sq_1.ps_partkey, partsupp.ps_supplycost = __scalar_sq_1.__value
+        Inner Join: nation.n_regionkey = region.r_regionkey
+          Inner Join: supplier.s_nationkey = nation.n_nationkey
+            Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
+              Inner Join: part.p_partkey = partsupp.ps_partkey
+                Filter: part.p_size = Int32(15) AND part.p_type LIKE Utf8("%BRASS")
+                  TableScan: part projection=[p_partkey, p_mfgr, p_type, p_size], partial_filters=[part.p_size = Int32(15), part.p_type LIKE Utf8("%BRASS")]
+                TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost]
+              TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment]
+            TableScan: nation projection=[n_nationkey, n_name, n_regionkey]
+          Filter: region.r_name = Utf8("EUROPE")
+            TableScan: region projection=[r_regionkey, r_name], partial_filters=[region.r_name = Utf8("EUROPE")]
+        SubqueryAlias: __scalar_sq_1
+          Projection: partsupp.ps_partkey, MIN(partsupp.ps_supplycost) AS __value
+            Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[MIN(partsupp.ps_supplycost)]]
+              Inner Join: nation.n_regionkey = region.r_regionkey
+                Inner Join: supplier.s_nationkey = nation.n_nationkey
+                  Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
+                    TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost]
+                    TableScan: supplier projection=[s_suppkey, s_nationkey]
+                  TableScan: nation projection=[n_nationkey, n_regionkey]
+                Filter: region.r_name = Utf8("EUROPE")
+                  TableScan: region projection=[r_regionkey, r_name], partial_filters=[region.r_name = Utf8("EUROPE")]"#;
     assert_eq!(actual, expected);
 
     // assert data
@@ -230,7 +230,6 @@ async fn tpch_q4_correlated() -> Result<()> {
     Ok(())
 }
 
-#[ignore] // https://github.com/apache/arrow-datafusion/issues/3437
 #[tokio::test]
 async fn tpch_q17_correlated() -> Result<()> {
     let parts = r#"63700,goldenrod lavender spring chocolate lace,Manufacturer#1,Brand#23,PROMO BURNISHED COPPER,7,MED BOX,901.00,ly. slyly ironi
@@ -255,17 +254,18 @@ async fn tpch_q17_correlated() -> Result<()> {
     let dataframe = ctx.sql(sql).await.unwrap();
     let plan = dataframe.into_optimized_plan().unwrap();
     let actual = format!("{}", plan.display_indent());
-    let expected = r#"Projection: CAST(SUM(lineitem.l_extendedprice) AS Decimal128(38, 33)) / CAST(Float64(7) AS Decimal128(38, 33)) AS avg_yearly
+    let expected = r#"Projection: CAST(SUM(lineitem.l_extendedprice) AS Float64) / Float64(7) AS avg_yearly
   Aggregate: groupBy=[[]], aggr=[[SUM(lineitem.l_extendedprice)]]
-    Filter: CAST(lineitem.l_quantity AS Decimal128(38, 21)) < __sq_1.__value
-      Inner Join: part.p_partkey = __sq_1.l_partkey
+    Filter: CAST(lineitem.l_quantity AS Decimal128(30, 15)) < CAST(__scalar_sq_1.__value AS Decimal128(30, 15))
+      Inner Join: part.p_partkey = __scalar_sq_1.l_partkey, lineitem.l_partkey = __scalar_sq_1.l_partkey
         Inner Join: lineitem.l_partkey = part.p_partkey
           TableScan: lineitem projection=[l_partkey, l_quantity, l_extendedprice]
           Filter: part.p_brand = Utf8("Brand#23") AND part.p_container = Utf8("MED BOX")
             TableScan: part projection=[p_partkey, p_brand, p_container]
-        Projection: lineitem.l_partkey, CAST(Float64(0.2) AS Decimal128(38, 21)) * CAST(AVG(lineitem.l_quantity) AS Decimal128(38, 21)) AS __value, alias=__sq_1
-          Aggregate: groupBy=[[lineitem.l_partkey]], aggr=[[AVG(lineitem.l_quantity)]]
-            TableScan: lineitem projection=[l_partkey, l_quantity, l_extendedprice]"#
+        SubqueryAlias: __scalar_sq_1
+          Projection: lineitem.l_partkey, Float64(0.2) * CAST(AVG(lineitem.l_quantity) AS Float64) AS __value
+            Aggregate: groupBy=[[lineitem.l_partkey]], aggr=[[AVG(lineitem.l_quantity)]]
+              TableScan: lineitem projection=[l_partkey, l_quantity]"#
         .to_string();
     assert_eq!(actual, expected);
 
@@ -275,7 +275,7 @@ async fn tpch_q17_correlated() -> Result<()> {
         "+--------------------+",
         "| avg_yearly         |",
         "+--------------------+",
-        "| 1901.3714285714286 |",
+        "| 190.13714285714286 |",
         "+--------------------+",
     ];
     assert_batches_eq!(expected, &results);
@@ -309,28 +309,28 @@ order by s_name;
     let dataframe = ctx.sql(sql).await.unwrap();
     let plan = dataframe.into_optimized_plan().unwrap();
     let actual = format!("{}", plan.display_indent());
-    let expected = "Sort: supplier.s_name ASC NULLS LAST\
-    \n  Projection: supplier.s_name, supplier.s_address\
-    \n    LeftSemi Join: supplier.s_suppkey = __sq_1.ps_suppkey\
-    \n      Inner Join: supplier.s_nationkey = nation.n_nationkey\
-    \n        TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey]\
-    \n        Filter: nation.n_name = Utf8(\"CANADA\")\
-    \n          TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8(\"CANADA\")]\
-    \n      SubqueryAlias: __sq_1\
-    \n        Projection: partsupp.ps_suppkey AS ps_suppkey\
-    \n          Filter: CAST(partsupp.ps_availqty AS Float64) > __sq_3.__value\
-    \n            Inner Join: partsupp.ps_partkey = __sq_3.l_partkey, partsupp.ps_suppkey = __sq_3.l_suppkey\
-    \n              LeftSemi Join: partsupp.ps_partkey = __sq_2.p_partkey\
-    \n                TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty]\
-    \n                SubqueryAlias: __sq_2\
-    \n                  Projection: part.p_partkey AS p_partkey\
-    \n                    Filter: part.p_name LIKE Utf8(\"forest%\")\
-    \n                      TableScan: part projection=[p_partkey, p_name], partial_filters=[part.p_name LIKE Utf8(\"forest%\")]\
-    \n              SubqueryAlias: __sq_3\
-    \n                Projection: lineitem.l_partkey, lineitem.l_suppkey, Float64(0.5) * CAST(SUM(lineitem.l_quantity) AS Float64) AS __value\
-    \n                  Aggregate: groupBy=[[lineitem.l_partkey, lineitem.l_suppkey]], aggr=[[SUM(lineitem.l_quantity)]]\
-    \n                    Filter: lineitem.l_shipdate >= Date32(\"8766\")\
-    \n                      TableScan: lineitem projection=[l_partkey, l_suppkey, l_quantity, l_shipdate], partial_filters=[lineitem.l_shipdate >= Date32(\"8766\")]";
+    let expected = r#"Sort: supplier.s_name ASC NULLS LAST
+  Projection: supplier.s_name, supplier.s_address
+    LeftSemi Join: supplier.s_suppkey = __correlated_sq_1.ps_suppkey
+      Inner Join: supplier.s_nationkey = nation.n_nationkey
+        TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey]
+        Filter: nation.n_name = Utf8("CANADA")
+          TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8("CANADA")]
+      SubqueryAlias: __correlated_sq_1
+        Projection: partsupp.ps_suppkey AS ps_suppkey
+          Filter: CAST(partsupp.ps_availqty AS Float64) > __scalar_sq_1.__value
+            Inner Join: partsupp.ps_partkey = __scalar_sq_1.l_partkey, partsupp.ps_suppkey = __scalar_sq_1.l_suppkey
+              LeftSemi Join: partsupp.ps_partkey = __correlated_sq_2.p_partkey
+                TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty]
+                SubqueryAlias: __correlated_sq_2
+                  Projection: part.p_partkey AS p_partkey
+                    Filter: part.p_name LIKE Utf8("forest%")
+                      TableScan: part projection=[p_partkey, p_name], partial_filters=[part.p_name LIKE Utf8("forest%")]
+              SubqueryAlias: __scalar_sq_1
+                Projection: lineitem.l_partkey, lineitem.l_suppkey, Float64(0.5) * CAST(SUM(lineitem.l_quantity) AS Float64) AS __value
+                  Aggregate: groupBy=[[lineitem.l_partkey, lineitem.l_suppkey]], aggr=[[SUM(lineitem.l_quantity)]]
+                    Filter: lineitem.l_shipdate >= Date32("8766")
+                      TableScan: lineitem projection=[l_partkey, l_suppkey, l_quantity, l_shipdate], partial_filters=[lineitem.l_shipdate >= Date32("8766")]"#;
     assert_eq!(actual, expected);
 
     // assert data
@@ -364,22 +364,22 @@ order by cntrycode;"#;
     let dataframe = ctx.sql(sql).await.unwrap();
     let plan = dataframe.into_optimized_plan().unwrap();
     let actual = format!("{}", plan.display_indent());
-    let expected = "Sort: custsale.cntrycode ASC NULLS LAST\
-    \n  Projection: custsale.cntrycode, COUNT(UInt8(1)) AS numcust, SUM(custsale.c_acctbal) AS totacctbal\
-    \n    Aggregate: groupBy=[[custsale.cntrycode]], aggr=[[COUNT(UInt8(1)), SUM(custsale.c_acctbal)]]\
-    \n      SubqueryAlias: custsale\
-    \n        Projection: substr(customer.c_phone, Int64(1), Int64(2)) AS cntrycode, customer.c_acctbal\
-    \n          Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > __sq_1.__value\
-    \n            CrossJoin:\
-    \n              LeftAnti Join: customer.c_custkey = orders.o_custkey\
-    \n                Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8(\"13\"), Utf8(\"31\"), Utf8(\"23\"), Utf8(\"29\"), Utf8(\"30\"), Utf8(\"18\"), Utf8(\"17\")])\
-    \n                  TableScan: customer projection=[c_custkey, c_phone, c_acctbal], partial_filters=[substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8(\"13\"), Utf8(\"31\"), Utf8(\"23\"), Utf8(\"29\"), Utf8(\"30\"), Utf8(\"18\"), Utf8(\"17\")])]\
-    \n                TableScan: orders projection=[o_custkey]\
-    \n              SubqueryAlias: __sq_1\
-    \n                Projection: AVG(customer.c_acctbal) AS __value\
-    \n                  Aggregate: groupBy=[[]], aggr=[[AVG(customer.c_acctbal)]]\
-    \n                    Filter: customer.c_acctbal > Decimal128(Some(0),15,2) AND substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8(\"13\"), Utf8(\"31\"), Utf8(\"23\"), Utf8(\"29\"), Utf8(\"30\"), Utf8(\"18\"), Utf8(\"17\")])\
-    \n                      TableScan: customer projection=[c_phone, c_acctbal], partial_filters=[customer.c_acctbal > Decimal128(Some(0),15,2) AS customer.c_acctbal > Decimal128(Some(0),30,15), substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8(\"13\"), Utf8(\"31\"), Utf8(\"23\"), Utf8(\"29\"), Utf8(\"30\"), Utf8(\"18\"), Utf8(\"17\")]), customer.c_acctbal > Decimal128(Some(0),15,2)]";
+    let expected = r#"Sort: custsale.cntrycode ASC NULLS LAST
+  Projection: custsale.cntrycode, COUNT(UInt8(1)) AS numcust, SUM(custsale.c_acctbal) AS totacctbal
+    Aggregate: groupBy=[[custsale.cntrycode]], aggr=[[COUNT(UInt8(1)), SUM(custsale.c_acctbal)]]
+      SubqueryAlias: custsale
+        Projection: substr(customer.c_phone, Int64(1), Int64(2)) AS cntrycode, customer.c_acctbal
+          Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > __scalar_sq_1.__value
+            CrossJoin:
+              LeftAnti Join: customer.c_custkey = orders.o_custkey
+                Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])
+                  TableScan: customer projection=[c_custkey, c_phone, c_acctbal], partial_filters=[substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])]
+                TableScan: orders projection=[o_custkey]
+              SubqueryAlias: __scalar_sq_1
+                Projection: AVG(customer.c_acctbal) AS __value
+                  Aggregate: groupBy=[[]], aggr=[[AVG(customer.c_acctbal)]]
+                    Filter: customer.c_acctbal > Decimal128(Some(0),15,2) AND substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])
+                      TableScan: customer projection=[c_phone, c_acctbal], partial_filters=[customer.c_acctbal > Decimal128(Some(0),15,2) AS customer.c_acctbal > Decimal128(Some(0),30,15), substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")]), customer.c_acctbal > Decimal128(Some(0),15,2)]"#;
     assert_eq!(expected, actual);
 
     // assert data
@@ -420,26 +420,26 @@ order by value desc;
     let dataframe = ctx.sql(sql).await.unwrap();
     let plan = dataframe.into_optimized_plan().unwrap();
     let actual = format!("{}", plan.display_indent());
-    let expected =  "Sort: value DESC NULLS FIRST\
-    \n  Projection: partsupp.ps_partkey, SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS value\
-    \n    Filter: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Decimal128(38, 15)) > CAST(__sq_1.__value AS Decimal128(38, 15))\
-    \n      CrossJoin:\
-    \n        Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]\
-    \n          Inner Join: supplier.s_nationkey = nation.n_nationkey\
-    \n            Inner Join: partsupp.ps_suppkey = supplier.s_suppkey\
-    \n              TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost]\
-    \n              TableScan: supplier projection=[s_suppkey, s_nationkey]\
-    \n            Filter: nation.n_name = Utf8(\"GERMANY\")\
-    \n              TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8(\"GERMANY\")]\
-    \n        SubqueryAlias: __sq_1\
-    \n          Projection: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Float64) * Float64(0.0001) AS __value\
-    \n            Aggregate: groupBy=[[]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]\
-    \n              Inner Join: supplier.s_nationkey = nation.n_nationkey\
-    \n                Inner Join: partsupp.ps_suppkey = supplier.s_suppkey\
-    \n                  TableScan: partsupp projection=[ps_suppkey, ps_availqty, ps_supplycost]\
-    \n                  TableScan: supplier projection=[s_suppkey, s_nationkey]\
-    \n                Filter: nation.n_name = Utf8(\"GERMANY\")\
-    \n                  TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8(\"GERMANY\")]";
+    let expected = r#"Sort: value DESC NULLS FIRST
+  Projection: partsupp.ps_partkey, SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS value
+    Filter: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Decimal128(38, 15)) > CAST(__scalar_sq_1.__value AS Decimal128(38, 15))
+      CrossJoin:
+        Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]
+          Inner Join: supplier.s_nationkey = nation.n_nationkey
+            Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
+              TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost]
+              TableScan: supplier projection=[s_suppkey, s_nationkey]
+            Filter: nation.n_name = Utf8("GERMANY")
+              TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8("GERMANY")]
+        SubqueryAlias: __scalar_sq_1
+          Projection: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Float64) * Float64(0.0001) AS __value
+            Aggregate: groupBy=[[]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]
+              Inner Join: supplier.s_nationkey = nation.n_nationkey
+                Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
+                  TableScan: partsupp projection=[ps_suppkey, ps_availqty, ps_supplycost]
+                  TableScan: supplier projection=[s_suppkey, s_nationkey]
+                Filter: nation.n_name = Utf8("GERMANY")
+                  TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8("GERMANY")]"#;
     assert_eq!(actual, expected);
 
     // assert data
diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/alias.rs
similarity index 50%
copy from datafusion/optimizer/src/lib.rs
copy to datafusion/optimizer/src/alias.rs
index 27e6dff08..70fdeb7ab 100644
--- a/datafusion/optimizer/src/lib.rs
+++ b/datafusion/optimizer/src/alias.rs
@@ -15,31 +15,30 @@
 // specific language governing permissions and limitations
 // under the License.
 
-pub mod common_subexpr_eliminate;
-pub mod decorrelate_where_exists;
-pub mod decorrelate_where_in;
-pub mod eliminate_cross_join;
-pub mod eliminate_filter;
-pub mod eliminate_limit;
-pub mod eliminate_outer_join;
-pub mod extract_equijoin_predicate;
-pub mod filter_null_join_keys;
-pub mod inline_table_scan;
-pub mod optimizer;
-pub mod propagate_empty_relation;
-pub mod push_down_filter;
-pub mod push_down_limit;
-pub mod push_down_projection;
-pub mod scalar_subquery_to_join;
-pub mod simplify_expressions;
-pub mod single_distinct_to_groupby;
-pub mod type_coercion;
-pub mod utils;
+use std::sync::atomic::{AtomicUsize, Ordering};
 
-pub mod rewrite_disjunctive_predicate;
-#[cfg(test)]
-pub mod test;
-pub mod unwrap_cast_in_comparison;
+/// A utility struct that can be used to generate unique aliases when optimizing queries
+pub struct AliasGenerator {
+    next_id: AtomicUsize,
+}
 
-pub use optimizer::{OptimizerConfig, OptimizerContext, OptimizerRule};
-pub use utils::optimize_children;
+impl Default for AliasGenerator {
+    fn default() -> Self {
+        Self {
+            next_id: AtomicUsize::new(1),
+        }
+    }
+}
+
+impl AliasGenerator {
+    /// Create a new [`AliasGenerator`]
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    /// Return a unique alias with the provided prefix
+    pub fn next(&self, prefix: &str) -> String {
+        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
+        format!("{}_{}", prefix, id)
+    }
+}
diff --git a/datafusion/optimizer/src/decorrelate_where_in.rs b/datafusion/optimizer/src/decorrelate_where_in.rs
index c2a80ac2b..1aa976ce8 100644
--- a/datafusion/optimizer/src/decorrelate_where_in.rs
+++ b/datafusion/optimizer/src/decorrelate_where_in.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::alias::AliasGenerator;
 use crate::optimizer::ApplyOrder;
 use crate::utils::{
     alias_cols, conjunction, exprs_to_join_cols, find_join_exprs, merge_cols,
@@ -28,12 +29,14 @@ use log::debug;
 use std::sync::Arc;
 
 #[derive(Default)]
-pub struct DecorrelateWhereIn {}
+pub struct DecorrelateWhereIn {
+    alias: AliasGenerator,
+}
 
 impl DecorrelateWhereIn {
     #[allow(missing_docs)]
     pub fn new() -> Self {
-        Self {}
+        Self::default()
     }
 
     /// Finds expressions that have a where in subquery (and recurses when found)
@@ -96,8 +99,12 @@ impl OptimizerRule for DecorrelateWhereIn {
                 // iterate through all exists clauses in predicate, turning each into a join
                 let mut cur_input = filter.input.as_ref().clone();
                 for subquery in subqueries {
-                    cur_input =
-                        optimize_where_in(&subquery, &cur_input, &other_exprs, config)?;
+                    cur_input = optimize_where_in(
+                        &subquery,
+                        &cur_input,
+                        &other_exprs,
+                        &self.alias,
+                    )?;
                 }
                 Ok(Some(cur_input))
             }
@@ -118,8 +125,8 @@ fn optimize_where_in(
     query_info: &SubqueryInfo,
     outer_input: &LogicalPlan,
     outer_other_exprs: &[Expr],
-    config: &dyn OptimizerConfig,
-) -> datafusion_common::Result<LogicalPlan> {
+    alias: &AliasGenerator,
+) -> Result<LogicalPlan> {
     let proj = Projection::try_from_plan(&query_info.query.subquery)
         .map_err(|e| context!("a projection is required", e))?;
     let mut subqry_input = proj.input.clone();
@@ -161,7 +168,7 @@ fn optimize_where_in(
         merge_cols((&[subquery_col], &subqry_cols), (&[outer_col], &outer_cols));
 
     // build subquery side of join - the thing the subquery was querying
-    let subqry_alias = format!("__sq_{}", config.next_id());
+    let subqry_alias = alias.next("__correlated_sq");
     let mut subqry_plan = LogicalPlanBuilder::from((*subqry_input).clone());
     if let Some(expr) = conjunction(other_subqry_exprs) {
         // if the subquery had additional expressions, restore them
@@ -256,13 +263,13 @@ mod tests {
             .build()?;
 
         let expected = "Projection: test.b [b:UInt32]\
-        \n  LeftSemi Join: test.b = __sq_2.c [a:UInt32, b:UInt32, c:UInt32]\
-        \n    LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n  LeftSemi Join: test.b = __correlated_sq_2.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n    LeftSemi Join: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
         \n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
-        \n      SubqueryAlias: __sq_1 [c:UInt32]\
+        \n      SubqueryAlias: __correlated_sq_1 [c:UInt32]\
         \n        Projection: sq_1.c AS c [c:UInt32]\
         \n          TableScan: sq_1 [a:UInt32, b:UInt32, c:UInt32]\
-        \n    SubqueryAlias: __sq_2 [c:UInt32]\
+        \n    SubqueryAlias: __correlated_sq_2 [c:UInt32]\
         \n      Projection: sq_2.c AS c [c:UInt32]\
         \n        TableScan: sq_2 [a:UInt32, b:UInt32, c:UInt32]";
 
@@ -286,9 +293,9 @@ mod tests {
 
         let expected = "Projection: test.b [b:UInt32]\
         \n  Filter: test.a = UInt32(1) AND test.b < UInt32(30) [a:UInt32, b:UInt32, c:UInt32]\
-        \n    LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n    LeftSemi Join: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
         \n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
-        \n      SubqueryAlias: __sq_1 [c:UInt32]\
+        \n      SubqueryAlias: __correlated_sq_1 [c:UInt32]\
         \n        Projection: sq.c AS c [c:UInt32]\
         \n          TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
 
@@ -340,9 +347,9 @@ mod tests {
         \n    Subquery: [c:UInt32]\
         \n      Projection: sq1.c [c:UInt32]\
         \n        TableScan: sq1 [a:UInt32, b:UInt32, c:UInt32]\
-        \n    LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n    LeftSemi Join: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
         \n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
-        \n      SubqueryAlias: __sq_1 [c:UInt32]\
+        \n      SubqueryAlias: __correlated_sq_1 [c:UInt32]\
         \n        Projection: sq2.c AS c [c:UInt32]\
         \n          TableScan: sq2 [a:UInt32, b:UInt32, c:UInt32]";
 
@@ -365,13 +372,13 @@ mod tests {
             .build()?;
 
         let expected = "Projection: test.b [b:UInt32]\
-        \n  LeftSemi Join: test.b = __sq_1.a [a:UInt32, b:UInt32, c:UInt32]\
+        \n  LeftSemi Join: test.b = __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]\
         \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
-        \n    SubqueryAlias: __sq_1 [a:UInt32]\
+        \n    SubqueryAlias: __correlated_sq_1 [a:UInt32]\
         \n      Projection: sq.a AS a [a:UInt32]\
-        \n        LeftSemi Join: sq.a = __sq_2.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n        LeftSemi Join: sq.a = __correlated_sq_2.c [a:UInt32, b:UInt32, c:UInt32]\
         \n          TableScan: sq [a:UInt32, b:UInt32, c:UInt32]\
-        \n          SubqueryAlias: __sq_2 [c:UInt32]\
+        \n          SubqueryAlias: __correlated_sq_2 [c:UInt32]\
         \n            Projection: sq_nested.c AS c [c:UInt32]\
         \n              TableScan: sq_nested [a:UInt32, b:UInt32, c:UInt32]";
 
@@ -401,9 +408,9 @@ mod tests {
         \n        TableScan: sq_outer [a:UInt32, b:UInt32, c:UInt32]\
         \n    SubqueryAlias: wrapped [b:UInt32, c:UInt32]\
         \n      Projection: test.b, test.c [b:UInt32, c:UInt32]\
-        \n        LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n        LeftSemi Join: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
         \n          TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
-        \n          SubqueryAlias: __sq_1 [c:UInt32]\
+        \n          SubqueryAlias: __correlated_sq_1 [c:UInt32]\
         \n            Projection: sq_inner.c AS c [c:UInt32]\
         \n              TableScan: sq_inner [a:UInt32, b:UInt32, c:UInt32]";
 
@@ -436,13 +443,13 @@ mod tests {
         debug!("plan to optimize:\n{}", plan.display_indent());
 
         let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
-        \n  LeftSemi Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64, c_name:Utf8]\
-        \n    LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
+        \n  LeftSemi Join: customer.c_custkey = __correlated_sq_2.o_custkey [c_custkey:Int64, c_name:Utf8]\
+        \n    LeftSemi Join: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
         \n      TableScan: customer [c_custkey:Int64, c_name:Utf8]\
-        \n      SubqueryAlias: __sq_1 [o_custkey:Int64]\
+        \n      SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
         \n        Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\
         \n          TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
-        \n    SubqueryAlias: __sq_2 [o_custkey:Int64]\n      Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\
+        \n    SubqueryAlias: __correlated_sq_2 [o_custkey:Int64]\n      Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\
         \n        TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
         assert_optimized_plan_eq_display_indent(
             Arc::new(DecorrelateWhereIn::new()),
@@ -479,13 +486,13 @@ mod tests {
             .build()?;
 
         let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
-        \n  LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
+        \n  LeftSemi Join: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
         \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
-        \n    SubqueryAlias: __sq_1 [o_custkey:Int64]\
+        \n    SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
         \n      Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\
-        \n        LeftSemi Join: orders.o_orderkey = __sq_2.l_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
+        \n        LeftSemi Join: orders.o_orderkey = __correlated_sq_2.l_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
         \n          TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
-        \n          SubqueryAlias: __sq_2 [l_orderkey:Int64]\
+        \n          SubqueryAlias: __correlated_sq_2 [l_orderkey:Int64]\
         \n            Projection: lineitem.l_orderkey AS l_orderkey [l_orderkey:Int64]\
         \n              TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64, l_suppkey:Int64, l_linenumber:Int32, l_quantity:Float64, l_extendedprice:Float64]";
 
@@ -517,9 +524,9 @@ mod tests {
             .build()?;
 
         let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
-        \n  LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
+        \n  LeftSemi Join: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
         \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
-        \n    SubqueryAlias: __sq_1 [o_custkey:Int64]\
+        \n    SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
         \n      Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\
         \n        Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
         \n          TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
@@ -549,9 +556,9 @@ mod tests {
 
         // Query will fail, but we can still transform the plan
         let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
-        \n  LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
+        \n  LeftSemi Join: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
         \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
-        \n    SubqueryAlias: __sq_1 [o_custkey:Int64]\
+        \n    SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
         \n      Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\
         \n        Filter: customer.c_custkey = customer.c_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
         \n          TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
@@ -580,9 +587,9 @@ mod tests {
             .build()?;
 
         let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
-        \n  LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
+        \n  LeftSemi Join: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
         \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
-        \n    SubqueryAlias: __sq_1 [o_custkey:Int64]\
+        \n    SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
         \n      Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\
         \n        Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
         \n          TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
@@ -611,9 +618,9 @@ mod tests {
             .build()?;
 
         let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
-        \n  LeftSemi Join: customer.c_custkey = __sq_1.o_custkey Filter: customer.c_custkey != orders.o_custkey [c_custkey:Int64, c_name:Utf8]\
+        \n  LeftSemi Join: customer.c_custkey = __correlated_sq_1.o_custkey Filter: customer.c_custkey != orders.o_custkey [c_custkey:Int64, c_name:Utf8]\
         \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
-        \n    SubqueryAlias: __sq_1 [o_custkey:Int64]\
+        \n    SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
         \n      Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\
         \n        TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
 
@@ -793,9 +800,9 @@ mod tests {
 
         let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
         \n  Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8]\
-        \n    LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
+        \n    LeftSemi Join: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
         \n      TableScan: customer [c_custkey:Int64, c_name:Utf8]\
-        \n      SubqueryAlias: __sq_1 [o_custkey:Int64]\
+        \n      SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
         \n        Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\
         \n          TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
 
@@ -858,9 +865,9 @@ mod tests {
             .build()?;
 
         let expected = "Projection: test.b [b:UInt32]\
-        \n  LeftSemi Join: test.c = __sq_1.c, test.a = __sq_1.a [a:UInt32, b:UInt32, c:UInt32]\
+        \n  LeftSemi Join: test.c = __correlated_sq_1.c, test.a = __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]\
         \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
-        \n    SubqueryAlias: __sq_1 [c:UInt32, a:UInt32]\
+        \n    SubqueryAlias: __correlated_sq_1 [c:UInt32, a:UInt32]\
         \n      Projection: sq.c AS c, sq.a AS a [c:UInt32, a:UInt32]\
         \n        TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
 
@@ -882,9 +889,9 @@ mod tests {
             .build()?;
 
         let expected = "Projection: test.b [b:UInt32]\
-        \n  LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n  LeftSemi Join: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
         \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
-        \n    SubqueryAlias: __sq_1 [c:UInt32]\
+        \n    SubqueryAlias: __correlated_sq_1 [c:UInt32]\
         \n      Projection: sq.c AS c [c:UInt32]\
         \n        TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
 
@@ -906,9 +913,9 @@ mod tests {
             .build()?;
 
         let expected = "Projection: test.b [b:UInt32]\
-        \n  LeftAnti Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n  LeftAnti Join: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
         \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
-        \n    SubqueryAlias: __sq_1 [c:UInt32]\
+        \n    SubqueryAlias: __correlated_sq_1 [c:UInt32]\
         \n      Projection: sq.c AS c [c:UInt32]\
         \n        TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
 
diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs
index 27e6dff08..cd743fcda 100644
--- a/datafusion/optimizer/src/lib.rs
+++ b/datafusion/optimizer/src/lib.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+pub mod alias;
 pub mod common_subexpr_eliminate;
 pub mod decorrelate_where_exists;
 pub mod decorrelate_where_in;
diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs
index 5d2dd45af..7383b311c 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -41,7 +41,6 @@ use chrono::{DateTime, Utc};
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::logical_plan::LogicalPlan;
 use log::{debug, trace, warn};
-use std::sync::atomic::AtomicUsize;
 use std::sync::Arc;
 use std::time::Instant;
 
@@ -83,11 +82,6 @@ pub trait OptimizerConfig {
 
     /// How many times to attempt to optimize the plan
     fn max_passes(&self) -> u8;
-
-    /// Return a unique ID
-    ///
-    /// This is useful for assigning unique names to aliases
-    fn next_id(&self) -> usize;
 }
 
 /// A standalone [`OptimizerConfig`] that can be used independently
@@ -97,8 +91,6 @@ pub struct OptimizerContext {
     /// Query execution start time that can be used to rewrite
     /// expressions such as `now()` to use a literal value instead
     query_execution_start_time: DateTime<Utc>,
-    /// id generator for optimizer passes
-    next_id: AtomicUsize,
     /// Option to skip rules that produce errors
     skip_failing_rules: bool,
     /// Specify whether to enable the filter_null_keys rule
@@ -112,7 +104,6 @@ impl OptimizerContext {
     pub fn new() -> Self {
         Self {
             query_execution_start_time: Utc::now(),
-            next_id: AtomicUsize::new(1),
             skip_failing_rules: true,
             filter_null_keys: true,
             max_passes: 3,
@@ -172,12 +163,6 @@ impl OptimizerConfig for OptimizerContext {
     fn max_passes(&self) -> u8 {
         self.max_passes
     }
-
-    fn next_id(&self) -> usize {
-        use std::sync::atomic::Ordering;
-        // Can use relaxed ordering as not used for synchronisation
-        self.next_id.fetch_add(1, Ordering::Relaxed)
-    }
 }
 
 /// A rule-based optimizer.
diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs
index c0ea975f3..51c4142af 100644
--- a/datafusion/optimizer/src/scalar_subquery_to_join.rs
+++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::alias::AliasGenerator;
 use crate::optimizer::ApplyOrder;
 use crate::utils::{
     conjunction, exprs_to_join_cols, find_join_exprs, only_or_err, split_conjunction,
@@ -30,12 +31,14 @@ use std::sync::Arc;
 
 /// Optimizer rule for rewriting subquery filters to joins
 #[derive(Default)]
-pub struct ScalarSubqueryToJoin {}
+pub struct ScalarSubqueryToJoin {
+    alias: Arc<AliasGenerator>,
+}
 
 impl ScalarSubqueryToJoin {
     #[allow(missing_docs)]
     pub fn new() -> Self {
-        Self {}
+        Self::default()
     }
 
     /// Finds expressions that have a scalar subquery in them (and recurses when found)
@@ -110,7 +113,7 @@ impl OptimizerRule for ScalarSubqueryToJoin {
                 let mut cur_input = filter.input.as_ref().clone();
                 for subquery in subqueries {
                     if let Some(optimized_subquery) =
-                        optimize_scalar(&subquery, &cur_input, &other_exprs, config)?
+                        optimize_scalar(&subquery, &cur_input, &other_exprs, &self.alias)?
                     {
                         cur_input = optimized_subquery;
                     } else {
@@ -173,7 +176,7 @@ fn optimize_scalar(
     query_info: &SubqueryInfo,
     filter_input: &LogicalPlan,
     outer_others: &[Expr],
-    config: &dyn OptimizerConfig,
+    alias: &AliasGenerator,
 ) -> Result<Option<LogicalPlan>> {
     let subquery = query_info.query.subquery.as_ref();
     debug!(
@@ -242,7 +245,7 @@ fn optimize_scalar(
     }
 
     // Only operate if one column is present and the other closed upon from outside scope
-    let subqry_alias = format!("__sq_{}", config.next_id());
+    let subqry_alias = alias.next("__scalar_sq");
     let group_by: Vec<_> = subqry_cols
         .iter()
         .map(|it| Expr::Column(it.clone()))
@@ -386,16 +389,16 @@ mod tests {
             .build()?;
 
         let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
-        \n  Filter: Int32(1) < __sq_2.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N, o_custkey:Int64, __value:Int64;N]\
-        \n    Inner Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N, o_custkey:Int64, __value:Int64;N]\
-        \n      Filter: Int32(1) < __sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
-        \n        Inner Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
+        \n  Filter: Int32(1) < __scalar_sq_2.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N, o_custkey:Int64, __value:Int64;N]\
+        \n    Inner Join: customer.c_custkey = __scalar_sq_2.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N, o_custkey:Int64, __value:Int64;N]\
+        \n      Filter: Int32(1) < __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
+        \n        Inner Join: customer.c_custkey = __scalar_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
         \n          TableScan: customer [c_custkey:Int64, c_name:Utf8]\
-        \n          SubqueryAlias: __sq_1 [o_custkey:Int64, __value:Int64;N]\
+        \n          SubqueryAlias: __scalar_sq_1 [o_custkey:Int64, __value:Int64;N]\
         \n            Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value [o_custkey:Int64, __value:Int64;N]\
         \n              Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N]\
         \n                TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
-        \n      SubqueryAlias: __sq_2 [o_custkey:Int64, __value:Int64;N]\
+        \n      SubqueryAlias: __scalar_sq_2 [o_custkey:Int64, __value:Int64;N]\
         \n        Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value [o_custkey:Int64, __value:Int64;N]\
         \n          Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N]\
         \n            TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
@@ -439,16 +442,16 @@ mod tests {
             .build()?;
 
         let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
-        \n  Filter: customer.c_acctbal < __sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Float64;N]\
-        \n    Inner Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Float64;N]\
+        \n  Filter: customer.c_acctbal < __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Float64;N]\
+        \n    Inner Join: customer.c_custkey = __scalar_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Float64;N]\
         \n      TableScan: customer [c_custkey:Int64, c_name:Utf8]\
-        \n      SubqueryAlias: __sq_1 [o_custkey:Int64, __value:Float64;N]\
+        \n      SubqueryAlias: __scalar_sq_1 [o_custkey:Int64, __value:Float64;N]\
         \n        Projection: orders.o_custkey, SUM(orders.o_totalprice) AS __value [o_custkey:Int64, __value:Float64;N]\
         \n          Aggregate: groupBy=[[orders.o_custkey]], aggr=[[SUM(orders.o_totalprice)]] [o_custkey:Int64, SUM(orders.o_totalprice):Float64;N]\
-        \n            Filter: orders.o_totalprice < __sq_2.__value [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N, l_orderkey:Int64, __value:Float64;N]\
-        \n              Inner Join: orders.o_orderkey = __sq_2.l_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N, l_orderkey:Int64, __value:Float64;N]\
+        \n            Filter: orders.o_totalprice < __scalar_sq_2.__value [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N, l_orderkey:Int64, __value:Float64;N]\
+        \n              Inner Join: orders.o_orderkey = __scalar_sq_2.l_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N, l_orderkey:Int64, __value:Float64;N]\
         \n                TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
-        \n                SubqueryAlias: __sq_2 [l_orderkey:Int64, __value:Float64;N]\
+        \n                SubqueryAlias: __scalar_sq_2 [l_orderkey:Int64, __value:Float64;N]\
         \n                  Projection: lineitem.l_orderkey, SUM(lineitem.l_extendedprice) AS __value [l_orderkey:Int64, __value:Float64;N]\
         \n                    Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_extendedprice)]] [l_orderkey:Int64, SUM(lineitem.l_extendedprice):Float64;N]\
         \n                      TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64, l_suppkey:Int64, l_linenumber:Int32, l_quantity:Float64, l_extendedprice:Float64]";
@@ -481,9 +484,9 @@ mod tests {
             .build()?;
 
         let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
-        \n  Inner Join: customer.c_custkey = __sq_1.o_custkey, customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
+        \n  Inner Join: customer.c_custkey = __scalar_sq_1.o_custkey, customer.c_custkey = __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
         \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
-        \n    SubqueryAlias: __sq_1 [o_custkey:Int64, __value:Int64;N]\
+        \n    SubqueryAlias: __scalar_sq_1 [o_custkey:Int64, __value:Int64;N]\
         \n      Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value [o_custkey:Int64, __value:Int64;N]\
         \n        Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N]\
         \n          Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
@@ -515,10 +518,10 @@ mod tests {
 
         // it will optimize, but fail for the same reason the unoptimized query would
         let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
-        \n  Filter: customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
+        \n  Filter: customer.c_custkey = __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
         \n    CrossJoin: [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
         \n      TableScan: customer [c_custkey:Int64, c_name:Utf8]\
-        \n      SubqueryAlias: __sq_1 [__value:Int64;N]\
+        \n      SubqueryAlias: __scalar_sq_1 [__value:Int64;N]\
         \n        Projection: MAX(orders.o_custkey) AS __value [__value:Int64;N]\
         \n          Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N]\
         \n            Filter: customer.c_custkey = customer.c_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
@@ -548,10 +551,10 @@ mod tests {
             .build()?;
 
         let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
-        \n  Filter: customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
+        \n  Filter: customer.c_custkey = __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
         \n    CrossJoin: [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
         \n      TableScan: customer [c_custkey:Int64, c_name:Utf8]\
-        \n      SubqueryAlias: __sq_1 [__value:Int64;N]\
+        \n      SubqueryAlias: __scalar_sq_1 [__value:Int64;N]\
         \n        Projection: MAX(orders.o_custkey) AS __value [__value:Int64;N]\
         \n          Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N]\
         \n            Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
@@ -739,10 +742,10 @@ mod tests {
 
         let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
         \n  Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
-        \n    Filter: customer.c_custkey >= __sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
-        \n      Inner Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
+        \n    Filter: customer.c_custkey >= __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
+        \n      Inner Join: customer.c_custkey = __scalar_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
         \n        TableScan: customer [c_custkey:Int64, c_name:Utf8]\
-        \n        SubqueryAlias: __sq_1 [o_custkey:Int64, __value:Int64;N]\
+        \n        SubqueryAlias: __scalar_sq_1 [o_custkey:Int64, __value:Int64;N]\
         \n          Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value [o_custkey:Int64, __value:Int64;N]\
         \n            Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N]\
         \n              TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
@@ -776,9 +779,9 @@ mod tests {
 
         let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
         \n  Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
-        \n    Inner Join: customer.c_custkey = __sq_1.o_custkey, customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
+        \n    Inner Join: customer.c_custkey = __scalar_sq_1.o_custkey, customer.c_custkey = __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
         \n      TableScan: customer [c_custkey:Int64, c_name:Utf8]\
-        \n      SubqueryAlias: __sq_1 [o_custkey:Int64, __value:Int64;N]\
+        \n      SubqueryAlias: __scalar_sq_1 [o_custkey:Int64, __value:Int64;N]\
         \n        Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value [o_custkey:Int64, __value:Int64;N]\
         \n          Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N]\
         \n            TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
@@ -845,10 +848,10 @@ mod tests {
             .build()?;
 
         let expected = "Projection: test.c [c:UInt32]\
-        \n  Filter: test.c < __sq_1.__value [a:UInt32, b:UInt32, c:UInt32, a:UInt32, __value:UInt32;N]\
-        \n    Inner Join: test.a = __sq_1.a [a:UInt32, b:UInt32, c:UInt32, a:UInt32, __value:UInt32;N]\
+        \n  Filter: test.c < __scalar_sq_1.__value [a:UInt32, b:UInt32, c:UInt32, a:UInt32, __value:UInt32;N]\
+        \n    Inner Join: test.a = __scalar_sq_1.a [a:UInt32, b:UInt32, c:UInt32, a:UInt32, __value:UInt32;N]\
         \n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
-        \n      SubqueryAlias: __sq_1 [a:UInt32, __value:UInt32;N]\
+        \n      SubqueryAlias: __scalar_sq_1 [a:UInt32, __value:UInt32;N]\
         \n        Projection: sq.a, MIN(sq.c) AS __value [a:UInt32, __value:UInt32;N]\
         \n          Aggregate: groupBy=[[sq.a]], aggr=[[MIN(sq.c)]] [a:UInt32, MIN(sq.c):UInt32;N]\
         \n            TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
@@ -877,10 +880,10 @@ mod tests {
             .build()?;
 
         let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
-        \n  Filter: customer.c_custkey < __sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
+        \n  Filter: customer.c_custkey < __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
         \n    CrossJoin: [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
         \n      TableScan: customer [c_custkey:Int64, c_name:Utf8]\
-        \n      SubqueryAlias: __sq_1 [__value:Int64;N]\
+        \n      SubqueryAlias: __scalar_sq_1 [__value:Int64;N]\
         \n        Projection: MAX(orders.o_custkey) AS __value [__value:Int64;N]\
         \n          Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N]\
         \n            TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
@@ -908,10 +911,10 @@ mod tests {
             .build()?;
 
         let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
-        \n  Filter: customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
+        \n  Filter: customer.c_custkey = __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
         \n    CrossJoin: [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
         \n      TableScan: customer [c_custkey:Int64, c_name:Utf8]\
-        \n      SubqueryAlias: __sq_1 [__value:Int64;N]\
+        \n      SubqueryAlias: __scalar_sq_1 [__value:Int64;N]\
         \n        Projection: MAX(orders.o_custkey) AS __value [__value:Int64;N]\
         \n          Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N]\
         \n            TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
diff --git a/datafusion/optimizer/tests/integration-test.rs b/datafusion/optimizer/tests/integration-test.rs
index 3a09af2f2..7d5c12181 100644
--- a/datafusion/optimizer/tests/integration-test.rs
+++ b/datafusion/optimizer/tests/integration-test.rs
@@ -64,10 +64,10 @@ fn subquery_filter_with_cast() -> Result<()> {
     )";
     let plan = test_sql(sql)?;
     let expected = "Projection: test.col_int32\
-    \n  Filter: CAST(test.col_int32 AS Float64) > __sq_1.__value\
+    \n  Filter: CAST(test.col_int32 AS Float64) > __scalar_sq_1.__value\
     \n    CrossJoin:\
     \n      TableScan: test projection=[col_int32]\
-    \n      SubqueryAlias: __sq_1\
+    \n      SubqueryAlias: __scalar_sq_1\
     \n        Projection: AVG(test.col_int32) AS __value\
     \n          Aggregate: groupBy=[[]], aggr=[[AVG(test.col_int32)]]\
     \n            Filter: test.col_utf8 >= Utf8(\"2002-05-08\") AND test.col_utf8 <= Utf8(\"2002-05-13\")\