You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/27 12:03:07 UTC

[arrow-datafusion] branch master updated: API-break: Support `SubqueryAlias` and remove `Alias in Projection` (#4333)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new ad3df7dd4 API-break: Support `SubqueryAlias` and remove `Alias in Projection` (#4333)
ad3df7dd4 is described below

commit ad3df7dd49f37d642422d53c8e1a49e7828d7d40
Author: jakevin <ja...@gmail.com>
AuthorDate: Sun Nov 27 20:03:02 2022 +0800

    API-break: Support `SubqueryAlias` and remove `Alias in Projection` (#4333)
    
    * subqueryAlias replace alias-projection
    
    * remove alias in projection.
    
    * correct plan in test
    
    * correct tpch plan
    
    * correct plan in UT.
    
    * fix conflict and add doc for `with_alias`
---
 benchmarks/expected-plans/q11.txt                  |  17 +-
 benchmarks/expected-plans/q13.txt                  |  17 +-
 benchmarks/expected-plans/q15.txt                  |  25 +--
 benchmarks/expected-plans/q16.txt                  |   7 +-
 benchmarks/expected-plans/q18.txt                  |   9 +-
 benchmarks/expected-plans/q2.txt                   |  21 ++-
 benchmarks/expected-plans/q20.txt                  |  27 +--
 benchmarks/expected-plans/q22.txt                  |  11 +-
 benchmarks/expected-plans/q7.txt                   |   4 +-
 benchmarks/expected-plans/q8.txt                   |   2 +-
 benchmarks/expected-plans/q9.txt                   |   2 +-
 datafusion/core/src/dataframe.rs                   |   6 +-
 datafusion/core/src/datasource/view.rs             |   5 +-
 datafusion/core/tests/sql/explain_analyze.rs       |  24 +--
 datafusion/core/tests/sql/joins.rs                 |  14 +-
 datafusion/core/tests/sql/subqueries.rs            | 207 +++++++++++----------
 datafusion/core/tests/sql/window.rs                |  52 +++---
 datafusion/expr/src/logical_plan/builder.rs        |  59 +++---
 datafusion/expr/src/logical_plan/plan.rs           |  21 +--
 datafusion/expr/src/utils.rs                       |   9 +-
 .../optimizer/src/common_subexpr_eliminate.rs      |  14 +-
 datafusion/optimizer/src/decorrelate_where_in.rs   | 131 +++++++------
 datafusion/optimizer/src/filter_push_down.rs       |  42 +++--
 datafusion/optimizer/src/inline_table_scan.rs      |  10 +-
 datafusion/optimizer/src/limit_push_down.rs        |   3 +-
 datafusion/optimizer/src/optimizer.rs              |   1 -
 datafusion/optimizer/src/projection_push_down.rs   |  20 +-
 .../optimizer/src/scalar_subquery_to_join.rs       | 178 +++++++++---------
 .../optimizer/src/subquery_filter_to_join.rs       |  14 +-
 datafusion/optimizer/tests/integration-test.rs     |  42 +++--
 datafusion/proto/src/logical_plan.rs               |  41 ++--
 datafusion/sql/src/planner.rs                      | 163 ++++++++--------
 32 files changed, 607 insertions(+), 591 deletions(-)

diff --git a/benchmarks/expected-plans/q11.txt b/benchmarks/expected-plans/q11.txt
index 0e886e2e7..e0ccdfce8 100644
--- a/benchmarks/expected-plans/q11.txt
+++ b/benchmarks/expected-plans/q11.txt
@@ -9,11 +9,12 @@ Sort: value DESC NULLS FIRST
               TableScan: supplier projection=[s_suppkey, s_nationkey]
             Filter: nation.n_name = Utf8("GERMANY")
               TableScan: nation projection=[n_nationkey, n_name]
-        Projection: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Float64) * Float64(0.0001) AS __value, alias=__sq_1
-          Aggregate: groupBy=[[]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]
-            Inner Join: supplier.s_nationkey = nation.n_nationkey
-              Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
-                TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost]
-                TableScan: supplier projection=[s_suppkey, s_nationkey]
-              Filter: nation.n_name = Utf8("GERMANY")
-                TableScan: nation projection=[n_nationkey, n_name]
\ No newline at end of file
+        SubqueryAlias: __sq_1
+          Projection: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Float64) * Float64(0.0001) AS __value
+            Aggregate: groupBy=[[]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]
+              Inner Join: supplier.s_nationkey = nation.n_nationkey
+                Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
+                  TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost]
+                  TableScan: supplier projection=[s_suppkey, s_nationkey]
+                Filter: nation.n_name = Utf8("GERMANY")
+                  TableScan: nation projection=[n_nationkey, n_name]
\ No newline at end of file
diff --git a/benchmarks/expected-plans/q13.txt b/benchmarks/expected-plans/q13.txt
index b6106a971..9eea8a161 100644
--- a/benchmarks/expected-plans/q13.txt
+++ b/benchmarks/expected-plans/q13.txt
@@ -1,11 +1,12 @@
 Sort: custdist DESC NULLS FIRST, c_orders.c_count DESC NULLS FIRST
   Projection: c_orders.c_count, COUNT(UInt8(1)) AS custdist
     Aggregate: groupBy=[[c_orders.c_count]], aggr=[[COUNT(UInt8(1))]]
-      Projection: c_orders.COUNT(orders.o_orderkey) AS c_count, alias=c_orders
-        Projection: COUNT(orders.o_orderkey), alias=c_orders
-          Projection: COUNT(orders.o_orderkey)
-            Aggregate: groupBy=[[customer.c_custkey]], aggr=[[COUNT(orders.o_orderkey)]]
-              Left Join: customer.c_custkey = orders.o_custkey
-                TableScan: customer projection=[c_custkey]
-                Filter: orders.o_comment NOT LIKE Utf8("%special%requests%")
-                  TableScan: orders projection=[o_orderkey, o_custkey, o_comment]
\ No newline at end of file
+      SubqueryAlias: c_orders
+        Projection: c_orders.COUNT(orders.o_orderkey) AS c_count
+          SubqueryAlias: c_orders
+            Projection: COUNT(orders.o_orderkey)
+              Aggregate: groupBy=[[customer.c_custkey]], aggr=[[COUNT(orders.o_orderkey)]]
+                Left Join: customer.c_custkey = orders.o_custkey
+                  TableScan: customer projection=[c_custkey]
+                  Filter: orders.o_comment NOT LIKE Utf8("%special%requests%")
+                    TableScan: orders projection=[o_orderkey, o_custkey, o_comment]
\ No newline at end of file
diff --git a/benchmarks/expected-plans/q15.txt b/benchmarks/expected-plans/q15.txt
index e78f8e0d9..1100d17b6 100644
--- a/benchmarks/expected-plans/q15.txt
+++ b/benchmarks/expected-plans/q15.txt
@@ -4,18 +4,21 @@ Sort: supplier.s_suppkey ASC NULLS LAST
     Inner Join: revenue0.total_revenue = __sq_1.__value
       Inner Join: supplier.s_suppkey = revenue0.supplier_no
         TableScan: supplier projection=[s_suppkey, s_name, s_address, s_phone]
-        Projection: supplier_no, total_revenue, alias=revenue0
-          Projection: lineitem.l_suppkey AS supplier_no, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue
-            Projection: lineitem.l_suppkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)
-              Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
-                Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587")
-                  TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate]
-      Projection: MAX(revenue0.total_revenue) AS __value, alias=__sq_1
-        Aggregate: groupBy=[[]], aggr=[[MAX(revenue0.total_revenue)]]
-          Projection: total_revenue, alias=revenue0
-            Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue
-              Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)
+        SubqueryAlias: revenue0
+          Projection: supplier_no, total_revenue
+            Projection: lineitem.l_suppkey AS supplier_no, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue
+              Projection: lineitem.l_suppkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)
                 Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
                   Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587")
                     TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate]
+      SubqueryAlias: __sq_1
+        Projection: MAX(revenue0.total_revenue) AS __value
+          Aggregate: groupBy=[[]], aggr=[[MAX(revenue0.total_revenue)]]
+            SubqueryAlias: revenue0
+              Projection: total_revenue
+                Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue
+                  Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)
+                    Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
+                      Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587")
+                        TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate]
 EmptyRelation
\ No newline at end of file
diff --git a/benchmarks/expected-plans/q16.txt b/benchmarks/expected-plans/q16.txt
index 5268aee70..60ef26933 100644
--- a/benchmarks/expected-plans/q16.txt
+++ b/benchmarks/expected-plans/q16.txt
@@ -8,6 +8,7 @@ Sort: supplier_cnt DESC NULLS FIRST, part.p_brand ASC NULLS LAST, part.p_type AS
               TableScan: partsupp projection=[ps_partkey, ps_suppkey]
               Filter: part.p_brand != Utf8("Brand#45") AND part.p_type NOT LIKE Utf8("MEDIUM POLISHED%") AND part.p_size IN ([Int32(49), Int32(14), Int32(23), Int32(45), Int32(19), Int32(3), Int32(36), Int32(9)])
                 TableScan: part projection=[p_partkey, p_brand, p_type, p_size]
-            Projection: supplier.s_suppkey AS s_suppkey, alias=__sq_1
-              Filter: supplier.s_comment LIKE Utf8("%Customer%Complaints%")
-                TableScan: supplier projection=[s_suppkey, s_comment]
+            SubqueryAlias: __sq_1
+              Projection: supplier.s_suppkey AS s_suppkey
+                Filter: supplier.s_comment LIKE Utf8("%Customer%Complaints%")
+                  TableScan: supplier projection=[s_suppkey, s_comment]
diff --git a/benchmarks/expected-plans/q18.txt b/benchmarks/expected-plans/q18.txt
index ce0b20c20..4017722c5 100644
--- a/benchmarks/expected-plans/q18.txt
+++ b/benchmarks/expected-plans/q18.txt
@@ -7,7 +7,8 @@ Sort: orders.o_totalprice DESC NULLS FIRST, orders.o_orderdate ASC NULLS LAST
             TableScan: customer projection=[c_custkey, c_name]
             TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice, o_orderdate]
           TableScan: lineitem projection=[l_orderkey, l_quantity]
-        Projection: lineitem.l_orderkey AS l_orderkey, alias=__sq_1
-          Filter: SUM(lineitem.l_quantity) > Decimal128(Some(30000),25,2)
-            Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_quantity)]]
-              TableScan: lineitem projection=[l_orderkey, l_quantity]
\ No newline at end of file
+        SubqueryAlias: __sq_1
+          Projection: lineitem.l_orderkey AS l_orderkey
+            Filter: SUM(lineitem.l_quantity) > Decimal128(Some(30000),25,2)
+              Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_quantity)]]
+                TableScan: lineitem projection=[l_orderkey, l_quantity]
\ No newline at end of file
diff --git a/benchmarks/expected-plans/q2.txt b/benchmarks/expected-plans/q2.txt
index e97305509..845d79263 100644
--- a/benchmarks/expected-plans/q2.txt
+++ b/benchmarks/expected-plans/q2.txt
@@ -13,13 +13,14 @@ Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name ASC NULLS LAST, supplie
             TableScan: nation projection=[n_nationkey, n_name, n_regionkey]
           Filter: region.r_name = Utf8("EUROPE")
             TableScan: region projection=[r_regionkey, r_name]
-        Projection: partsupp.ps_partkey, MIN(partsupp.ps_supplycost) AS __value, alias=__sq_1
-          Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[MIN(partsupp.ps_supplycost)]]
-            Inner Join: nation.n_regionkey = region.r_regionkey
-              Inner Join: supplier.s_nationkey = nation.n_nationkey
-                Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
-                  TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost]
-                  TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment]
-                TableScan: nation projection=[n_nationkey, n_name, n_regionkey]
-              Filter: region.r_name = Utf8("EUROPE")
-                TableScan: region projection=[r_regionkey, r_name]
\ No newline at end of file
+        SubqueryAlias: __sq_1
+          Projection: partsupp.ps_partkey, MIN(partsupp.ps_supplycost) AS __value
+            Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[MIN(partsupp.ps_supplycost)]]
+              Inner Join: nation.n_regionkey = region.r_regionkey
+                Inner Join: supplier.s_nationkey = nation.n_nationkey
+                  Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
+                    TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost]
+                    TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment]
+                  TableScan: nation projection=[n_nationkey, n_name, n_regionkey]
+                Filter: region.r_name = Utf8("EUROPE")
+                  TableScan: region projection=[r_regionkey, r_name]
\ No newline at end of file
diff --git a/benchmarks/expected-plans/q20.txt b/benchmarks/expected-plans/q20.txt
index 0d095a735..1266622ea 100644
--- a/benchmarks/expected-plans/q20.txt
+++ b/benchmarks/expected-plans/q20.txt
@@ -5,15 +5,18 @@ Sort: supplier.s_name ASC NULLS LAST
         TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey]
         Filter: nation.n_name = Utf8("CANADA")
           TableScan: nation projection=[n_nationkey, n_name]
-      Projection: partsupp.ps_suppkey AS ps_suppkey, alias=__sq_2
-        Filter: CAST(partsupp.ps_availqty AS Float64) > __sq_3.__value
-          Inner Join: partsupp.ps_partkey = __sq_3.l_partkey, partsupp.ps_suppkey = __sq_3.l_suppkey
-            LeftSemi Join: partsupp.ps_partkey = __sq_1.p_partkey
-              TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty]
-              Projection: part.p_partkey AS p_partkey, alias=__sq_1
-                Filter: part.p_name LIKE Utf8("forest%")
-                  TableScan: part projection=[p_partkey, p_name]
-            Projection: lineitem.l_partkey, lineitem.l_suppkey, Float64(0.5) * CAST(SUM(lineitem.l_quantity) AS Float64) AS __value, alias=__sq_3
-              Aggregate: groupBy=[[lineitem.l_partkey, lineitem.l_suppkey]], aggr=[[SUM(lineitem.l_quantity)]]
-                Filter: lineitem.l_shipdate >= Date32("8766") AND lineitem.l_shipdate < Date32("9131")
-                  TableScan: lineitem projection=[l_partkey, l_suppkey, l_quantity, l_shipdate]
\ No newline at end of file
+      SubqueryAlias: __sq_2
+        Projection: partsupp.ps_suppkey AS ps_suppkey
+          Filter: CAST(partsupp.ps_availqty AS Float64) > __sq_3.__value
+            Inner Join: partsupp.ps_partkey = __sq_3.l_partkey, partsupp.ps_suppkey = __sq_3.l_suppkey
+              LeftSemi Join: partsupp.ps_partkey = __sq_1.p_partkey
+                TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty]
+                SubqueryAlias: __sq_1
+                  Projection: part.p_partkey AS p_partkey
+                    Filter: part.p_name LIKE Utf8("forest%")
+                      TableScan: part projection=[p_partkey, p_name]
+              SubqueryAlias: __sq_3
+                Projection: lineitem.l_partkey, lineitem.l_suppkey, Float64(0.5) * CAST(SUM(lineitem.l_quantity) AS Float64) AS __value
+                  Aggregate: groupBy=[[lineitem.l_partkey, lineitem.l_suppkey]], aggr=[[SUM(lineitem.l_quantity)]]
+                    Filter: lineitem.l_shipdate >= Date32("8766") AND lineitem.l_shipdate < Date32("9131")
+                      TableScan: lineitem projection=[l_partkey, l_suppkey, l_quantity, l_shipdate]
\ No newline at end of file
diff --git a/benchmarks/expected-plans/q22.txt b/benchmarks/expected-plans/q22.txt
index 81c12ba91..82060bd59 100644
--- a/benchmarks/expected-plans/q22.txt
+++ b/benchmarks/expected-plans/q22.txt
@@ -1,7 +1,7 @@
 Sort: custsale.cntrycode ASC NULLS LAST
   Projection: custsale.cntrycode, COUNT(UInt8(1)) AS numcust, SUM(custsale.c_acctbal) AS totacctbal
     Aggregate: groupBy=[[custsale.cntrycode]], aggr=[[COUNT(UInt8(1)), SUM(custsale.c_acctbal)]]
-      Projection: cntrycode, customer.c_acctbal, alias=custsale
+      SubqueryAlias: custsale
         Projection: substr(customer.c_phone, Int64(1), Int64(2)) AS cntrycode, customer.c_acctbal
           Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > __sq_1.__value
             CrossJoin:
@@ -9,7 +9,8 @@ Sort: custsale.cntrycode ASC NULLS LAST
                 Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])
                   TableScan: customer projection=[c_custkey, c_phone, c_acctbal]
                 TableScan: orders projection=[o_custkey]
-              Projection: AVG(customer.c_acctbal) AS __value, alias=__sq_1
-                Aggregate: groupBy=[[]], aggr=[[AVG(customer.c_acctbal)]]
-                  Filter: customer.c_acctbal > Decimal128(Some(0),15,2) AND substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])
-                    TableScan: customer projection=[c_phone, c_acctbal]
\ No newline at end of file
+              SubqueryAlias: __sq_1
+                Projection: AVG(customer.c_acctbal) AS __value
+                  Aggregate: groupBy=[[]], aggr=[[AVG(customer.c_acctbal)]]
+                    Filter: customer.c_acctbal > Decimal128(Some(0),15,2) AND substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])
+                      TableScan: customer projection=[c_phone, c_acctbal]
\ No newline at end of file
diff --git a/benchmarks/expected-plans/q7.txt b/benchmarks/expected-plans/q7.txt
index 3a5d88965..74857c6f9 100644
--- a/benchmarks/expected-plans/q7.txt
+++ b/benchmarks/expected-plans/q7.txt
@@ -1,7 +1,7 @@
 Sort: shipping.supp_nation ASC NULLS LAST, shipping.cust_nation ASC NULLS LAST, shipping.l_year ASC NULLS LAST
   Projection: shipping.supp_nation, shipping.cust_nation, shipping.l_year, SUM(shipping.volume) AS revenue
     Aggregate: groupBy=[[shipping.supp_nation, shipping.cust_nation, shipping.l_year]], aggr=[[SUM(shipping.volume)]]
-      Projection: supp_nation, cust_nation, l_year, volume, alias=shipping
+      SubqueryAlias: shipping
         Projection: n1.n_name AS supp_nation, n2.n_name AS cust_nation, datepart(Utf8("YEAR"), lineitem.l_shipdate) AS l_year, CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4)) AS volume
           Filter: (n1.n_name = Utf8("FRANCE") OR n2.n_name = Utf8("FRANCE")) AND (n2.n_name = Utf8("GERMANY") OR n1.n_name = Utf8("GERMANY"))
             Inner Join: customer.c_nationkey = n2.n_nationkey
@@ -19,4 +19,4 @@ Sort: shipping.supp_nation ASC NULLS LAST, shipping.cust_nation ASC NULLS LAST,
                     TableScan: nation projection=[n_nationkey, n_name]
               Filter: n2.n_name = Utf8("GERMANY") OR n2.n_name = Utf8("FRANCE")
                 SubqueryAlias: n2
-                  TableScan: nation projection=[n_nationkey, n_name]
+                  TableScan: nation projection=[n_nationkey, n_name]
\ No newline at end of file
diff --git a/benchmarks/expected-plans/q8.txt b/benchmarks/expected-plans/q8.txt
index 1b8d08ef8..75e65d835 100644
--- a/benchmarks/expected-plans/q8.txt
+++ b/benchmarks/expected-plans/q8.txt
@@ -1,7 +1,7 @@
 Sort: all_nations.o_year ASC NULLS LAST
   Projection: all_nations.o_year, SUM(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END) / SUM(all_nations.volume) AS mkt_share
     Aggregate: groupBy=[[all_nations.o_year]], aggr=[[SUM(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Decimal128(Some(0),38,4) END) AS SUM(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END), SUM(all_nations.volume)]]
-      Projection: o_year, volume, nation, alias=all_nations
+      SubqueryAlias: all_nations
         Projection: datepart(Utf8("YEAR"), orders.o_orderdate) AS o_year, CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4)) AS volume, n2.n_name AS nation
           Projection: lineitem.l_extendedprice, lineitem.l_discount, orders.o_orderdate, n2.n_name
             Inner Join: n1.n_regionkey = region.r_regionkey
diff --git a/benchmarks/expected-plans/q9.txt b/benchmarks/expected-plans/q9.txt
index ae7d4f194..166c98d97 100644
--- a/benchmarks/expected-plans/q9.txt
+++ b/benchmarks/expected-plans/q9.txt
@@ -1,7 +1,7 @@
 Sort: profit.nation ASC NULLS LAST, profit.o_year DESC NULLS FIRST
   Projection: profit.nation, profit.o_year, SUM(profit.amount) AS sum_profit
     Aggregate: groupBy=[[profit.nation, profit.o_year]], aggr=[[SUM(profit.amount)]]
-      Projection: nation, o_year, amount, alias=profit
+      SubqueryAlias: profit
         Projection: nation.n_name AS nation, datepart(Utf8("YEAR"), orders.o_orderdate) AS o_year, CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4)) - CAST(partsupp.ps_supplycost * lineitem.l_quantity AS Decimal128(38, 4)) AS amount
           Projection: lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, partsupp.ps_supplycost, orders.o_orderdate, nation.n_name
             Inner Join: supplier.s_nationkey = nation.n_nationkey
diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index 3d1c8d009..4a3533227 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -1342,10 +1342,10 @@ mod tests {
         \n  Limit: skip=0, fetch=1\
         \n    Sort: t1.c1 ASC NULLS FIRST, t1.c2 ASC NULLS FIRST, t1.c3 ASC NULLS FIRST, t2.c1 ASC NULLS FIRST, t2.c2 ASC NULLS FIRST, t2.c3 ASC NULLS FIRST, fetch=1\
         \n      Inner Join: t1.c1 = t2.c1\
-        \n        Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3, alias=t1\
+        \n        SubqueryAlias: t1\
         \n          Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\
         \n            TableScan: aggregate_test_100 projection=[c1, c2, c3]\
-        \n        Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3, alias=t2\
+        \n        SubqueryAlias: t2\
         \n          Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\
         \n            TableScan: aggregate_test_100 projection=[c1, c2, c3]",
                    format!("{:?}", df_renamed.to_logical_plan()?)
@@ -1388,7 +1388,7 @@ mod tests {
         let plan = df.explain(false, false)?.collect().await?;
         // Filters all the way to Parquet
         let formatted = pretty::pretty_format_batches(&plan).unwrap().to_string();
-        assert!(formatted.contains("predicate=id_min@0 <= 1 AND 1 <= id_max@1"));
+        assert!(formatted.contains("FilterExec: id@0 = 1"));
 
         Ok(())
     }
diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs
index 5238d4733..34a427030 100644
--- a/datafusion/core/src/datasource/view.rs
+++ b/datafusion/core/src/datasource/view.rs
@@ -445,7 +445,7 @@ mod tests {
         let formatted = arrow::util::pretty::pretty_format_batches(&plan)
             .unwrap()
             .to_string();
-        assert!(formatted.contains("predicate=id_min@0 <= 1 AND 1 <= id_max@1"));
+        assert!(formatted.contains("FilterExec: id@0 = 1"));
         Ok(())
     }
 
@@ -474,7 +474,8 @@ mod tests {
         let formatted = arrow::util::pretty::pretty_format_batches(&plan)
             .unwrap()
             .to_string();
-        assert!(formatted.contains("ParquetExec: limit=Some(10)"));
+        // TODO: limit_push_down support SubqueryAlias
+        assert!(formatted.contains("GlobalLimitExec: skip=0, fetch=10"));
         Ok(())
     }
 
diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs
index da290daa9..c4802b43f 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -663,7 +663,7 @@ order by
     \n          Filter: lineitem.l_returnflag = Utf8(\"R\")\
     \n            TableScan: lineitem projection=[l_orderkey, l_extendedprice, l_discount, l_returnflag], partial_filters=[lineitem.l_returnflag = Utf8(\"R\")]\
     \n        TableScan: nation projection=[n_nationkey, n_name]";
-    assert_eq!(expected, format!("{:?}", plan.unwrap()),);
+    assert_eq!(expected, format!("{:?}", plan.unwrap()));
 
     Ok(())
 }
@@ -738,15 +738,13 @@ async fn test_physical_plan_display_indent_multi_children() {
         "      CoalesceBatchesExec: target_batch_size=4096",
         "        RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 9000)",
         "          ProjectionExec: expr=[c1@0 as c1]",
-        "            ProjectionExec: expr=[c1@0 as c1]",
-        "              RepartitionExec: partitioning=RoundRobinBatch(9000)",
-        "                CsvExec: files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, limit=None, projection=[c1]",
+        "            RepartitionExec: partitioning=RoundRobinBatch(9000)",
+        "              CsvExec: files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, limit=None, projection=[c1]",
         "      CoalesceBatchesExec: target_batch_size=4096",
         "        RepartitionExec: partitioning=Hash([Column { name: \"c2\", index: 0 }], 9000)",
-        "          ProjectionExec: expr=[c2@0 as c2]",
-        "            ProjectionExec: expr=[c1@0 as c2]",
-        "              RepartitionExec: partitioning=RoundRobinBatch(9000)",
-        "                CsvExec: files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, limit=None, projection=[c1, c2]",
+        "          ProjectionExec: expr=[c1@0 as c2]",
+        "            RepartitionExec: partitioning=RoundRobinBatch(9000)",
+        "              CsvExec: files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, limit=None, projection=[c1, c2]",
     ];
 
     let normalizer = ExplainNormalizer::new();
@@ -782,15 +780,15 @@ async fn csv_explain() {
             "logical_plan",
             "Projection: aggregate_test_100.c1\
              \n  Filter: aggregate_test_100.c2 > Int8(10)\
-             \n    TableScan: aggregate_test_100 projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > Int8(10)]"
+             \n    TableScan: aggregate_test_100 projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > Int8(10)]",
         ],
         vec!["physical_plan",
-             "ProjectionExec: expr=[c1@0 as c1]\
+            "ProjectionExec: expr=[c1@0 as c1]\
               \n  CoalesceBatchesExec: target_batch_size=4096\
               \n    FilterExec: c2@1 > 10\
               \n      RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\
               \n        CsvExec: files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, limit=None, projection=[c1, c2]\
-              \n"
+              \n",
         ]];
     assert_eq!(expected, actual);
 
@@ -885,7 +883,9 @@ async fn explain_logical_plan_only() {
             "logical_plan",
             "Projection: COUNT(UInt8(1))\
             \n  Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]\
-            \n    Values: (Utf8(\"a\"), Int64(1), Int64(100)), (Utf8(\"a\"), Int64(2), Int64(150))",
+            \n    SubqueryAlias: t\
+            \n      SubqueryAlias: t\
+            \n        Values: (Utf8(\"a\"), Int64(1), Int64(100)), (Utf8(\"a\"), Int64(2), Int64(150))",
         ]];
     assert_eq!(expected, actual);
 }
diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs
index 4831cbe4a..87fb594c7 100644
--- a/datafusion/core/tests/sql/joins.rs
+++ b/datafusion/core/tests/sql/joins.rs
@@ -1635,16 +1635,16 @@ async fn reduce_left_join_3() -> Result<()> {
         let expected = vec![
             "Explain [plan_type:Utf8, plan:Utf8]",
             "  Projection: t3.t1_id, t3.t1_name, t3.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-            "    Left Join: t3.t1_int = t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-            "      Projection: t1.t1_id, t1.t1_name, t1.t1_int, alias=t3 [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-            "        Projection: t1.t1_id, t1.t1_name, t1.t1_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+            "    Left Join: t3.t1_int = t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "      Filter: t3.t1_id < UInt32(100) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "        SubqueryAlias: t3 [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
             "          Inner Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-            "            Filter: t1.t1_id < UInt32(100) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-            "              TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-            "            Filter: t2.t2_int < UInt32(3) AND t2.t2_id < UInt32(100) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "            TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+            "            Filter: t2.t2_int < UInt32(3) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
             "              TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
             "      TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
-        ];
+        ]
+            ;
         let formatted = plan.display_indent_schema().to_string();
         let actual: Vec<&str> = formatted.trim().lines().collect();
         assert_eq!(
diff --git a/datafusion/core/tests/sql/subqueries.rs b/datafusion/core/tests/sql/subqueries.rs
index 98bf56a02..719c0c3d7 100644
--- a/datafusion/core/tests/sql/subqueries.rs
+++ b/datafusion/core/tests/sql/subqueries.rs
@@ -50,20 +50,21 @@ where c_acctbal < (
 
     let plan = ctx.optimize(&plan).unwrap();
     let actual = format!("{}", plan.display_indent());
-    let expected = r#"Sort: customer.c_custkey ASC NULLS LAST
-  Projection: customer.c_custkey
-    Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __sq_2.__value
-      Inner Join: customer.c_custkey = __sq_2.o_custkey
-        TableScan: customer projection=[c_custkey, c_acctbal]
-        Projection: orders.o_custkey, SUM(orders.o_totalprice) AS __value, alias=__sq_2
-          Aggregate: groupBy=[[orders.o_custkey]], aggr=[[SUM(orders.o_totalprice)]]
-            Filter: CAST(orders.o_totalprice AS Decimal128(25, 2)) < __sq_1.__value
-              Inner Join: orders.o_orderkey = __sq_1.l_orderkey
-                TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice]
-                Projection: lineitem.l_orderkey, SUM(lineitem.l_extendedprice) AS price AS __value, alias=__sq_1
-                  Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_extendedprice)]]
-                    TableScan: lineitem projection=[l_orderkey, l_extendedprice]"#
-        .to_string();
+    let expected = "Sort: customer.c_custkey ASC NULLS LAST\
+    \n  Projection: customer.c_custkey\
+    \n    Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __sq_2.__value\
+    \n      Inner Join: customer.c_custkey = __sq_2.o_custkey\
+    \n        TableScan: customer projection=[c_custkey, c_acctbal]\
+    \n        SubqueryAlias: __sq_2\
+    \n          Projection: orders.o_custkey, SUM(orders.o_totalprice) AS __value\
+    \n            Aggregate: groupBy=[[orders.o_custkey]], aggr=[[SUM(orders.o_totalprice)]]\
+    \n              Filter: CAST(orders.o_totalprice AS Decimal128(25, 2)) < __sq_1.__value\
+    \n                Inner Join: orders.o_orderkey = __sq_1.l_orderkey\
+    \n                  TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice]\
+    \n                  SubqueryAlias: __sq_1\
+    \n                    Projection: lineitem.l_orderkey, SUM(lineitem.l_extendedprice) AS price AS __value\
+    \n                      Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_extendedprice)]]\
+    \n                        TableScan: lineitem projection=[l_orderkey, l_extendedprice]";
     assert_eq!(actual, expected);
 
     Ok(())
@@ -93,12 +94,12 @@ where o_orderstatus in (
     let plan = ctx.create_logical_plan(sql).unwrap();
     let plan = ctx.optimize(&plan).unwrap();
     let actual = format!("{}", plan.display_indent());
-    let expected = r#"Projection: orders.o_orderkey
-  LeftSemi Join: orders.o_orderstatus = __sq_1.l_linestatus, orders.o_orderkey = __sq_1.l_orderkey
-    TableScan: orders projection=[o_orderkey, o_orderstatus]
-    Projection: lineitem.l_linestatus AS l_linestatus, lineitem.l_orderkey AS l_orderkey, alias=__sq_1
-      TableScan: lineitem projection=[l_orderkey, l_linestatus]"#
-        .to_string();
+    let expected = "Projection: orders.o_orderkey\
+    \n  LeftSemi Join: orders.o_orderstatus = __sq_1.l_linestatus, orders.o_orderkey = __sq_1.l_orderkey\
+    \n    TableScan: orders projection=[o_orderkey, o_orderstatus]\
+    \n    SubqueryAlias: __sq_1\
+    \n      Projection: lineitem.l_linestatus AS l_linestatus, lineitem.l_orderkey AS l_orderkey\
+    \n        TableScan: lineitem projection=[l_orderkey, l_linestatus]";
     assert_eq!(actual, expected);
 
     // assert data
@@ -139,32 +140,32 @@ order by s_acctbal desc, n_name, s_name, p_partkey;"#;
     let plan = ctx.create_logical_plan(sql).unwrap();
     let plan = ctx.optimize(&plan).unwrap();
     let actual = format!("{}", plan.display_indent());
-    let expected = r#"Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name ASC NULLS LAST, supplier.s_name ASC NULLS LAST, part.p_partkey ASC NULLS LAST
-  Projection: supplier.s_acctbal, supplier.s_name, nation.n_name, part.p_partkey, part.p_mfgr, supplier.s_address, supplier.s_phone, supplier.s_comment
-    Projection: part.p_partkey, part.p_mfgr, supplier.s_name, supplier.s_address, supplier.s_phone, supplier.s_acctbal, supplier.s_comment, nation.n_name
-      Inner Join: part.p_partkey = __sq_1.ps_partkey, partsupp.ps_supplycost = __sq_1.__value
-        Inner Join: nation.n_regionkey = region.r_regionkey
-          Inner Join: supplier.s_nationkey = nation.n_nationkey
-            Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
-              Inner Join: part.p_partkey = partsupp.ps_partkey
-                Filter: part.p_size = Int32(15) AND part.p_type LIKE Utf8("%BRASS")
-                  TableScan: part projection=[p_partkey, p_mfgr, p_type, p_size], partial_filters=[part.p_size = Int32(15), part.p_type LIKE Utf8("%BRASS")]
-                TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost]
-              TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment]
-            TableScan: nation projection=[n_nationkey, n_name, n_regionkey]
-          Filter: region.r_name = Utf8("EUROPE")
-            TableScan: region projection=[r_regionkey, r_name], partial_filters=[region.r_name = Utf8("EUROPE")]
-        Projection: partsupp.ps_partkey, MIN(partsupp.ps_supplycost) AS __value, alias=__sq_1
-          Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[MIN(partsupp.ps_supplycost)]]
-            Inner Join: nation.n_regionkey = region.r_regionkey
-              Inner Join: supplier.s_nationkey = nation.n_nationkey
-                Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
-                  TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost]
-                  TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment]
-                TableScan: nation projection=[n_nationkey, n_name, n_regionkey]
-              Filter: region.r_name = Utf8("EUROPE")
-                TableScan: region projection=[r_regionkey, r_name], partial_filters=[region.r_name = Utf8("EUROPE")]"#
-        .to_string();
+    let expected = "Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name ASC NULLS LAST, supplier.s_name ASC NULLS LAST, part.p_partkey ASC NULLS LAST\
+    \n  Projection: supplier.s_acctbal, supplier.s_name, nation.n_name, part.p_partkey, part.p_mfgr, supplier.s_address, supplier.s_phone, supplier.s_comment\
+    \n    Projection: part.p_partkey, part.p_mfgr, supplier.s_name, supplier.s_address, supplier.s_phone, supplier.s_acctbal, supplier.s_comment, nation.n_name\
+    \n      Inner Join: part.p_partkey = __sq_1.ps_partkey, partsupp.ps_supplycost = __sq_1.__value\
+    \n        Inner Join: nation.n_regionkey = region.r_regionkey\
+    \n          Inner Join: supplier.s_nationkey = nation.n_nationkey\
+    \n            Inner Join: partsupp.ps_suppkey = supplier.s_suppkey\
+    \n              Inner Join: part.p_partkey = partsupp.ps_partkey\
+    \n                Filter: part.p_size = Int32(15) AND part.p_type LIKE Utf8(\"%BRASS\")\
+    \n                  TableScan: part projection=[p_partkey, p_mfgr, p_type, p_size], partial_filters=[part.p_size = Int32(15), part.p_type LIKE Utf8(\"%BRASS\")]\
+    \n                TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost]\
+    \n              TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment]\
+    \n            TableScan: nation projection=[n_nationkey, n_name, n_regionkey]\
+    \n          Filter: region.r_name = Utf8(\"EUROPE\")\
+    \n            TableScan: region projection=[r_regionkey, r_name], partial_filters=[region.r_name = Utf8(\"EUROPE\")]\
+    \n        SubqueryAlias: __sq_1\
+    \n          Projection: partsupp.ps_partkey, MIN(partsupp.ps_supplycost) AS __value\
+    \n            Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[MIN(partsupp.ps_supplycost)]]\
+    \n              Inner Join: nation.n_regionkey = region.r_regionkey\
+    \n                Inner Join: supplier.s_nationkey = nation.n_nationkey\
+    \n                  Inner Join: partsupp.ps_suppkey = supplier.s_suppkey\
+    \n                    TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost]\
+    \n                    TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment]\
+    \n                  TableScan: nation projection=[n_nationkey, n_name, n_regionkey]\
+    \n                Filter: region.r_name = Utf8(\"EUROPE\")\
+    \n                  TableScan: region projection=[r_regionkey, r_name], partial_filters=[region.r_name = Utf8(\"EUROPE\")]";
     assert_eq!(actual, expected);
 
     // assert data
@@ -321,26 +322,28 @@ order by s_name;
         .map_err(|e| format!("{:?} at {}", e, "error"))
         .unwrap();
     let actual = format!("{}", plan.display_indent());
-    let expected = r#"Sort: supplier.s_name ASC NULLS LAST
-  Projection: supplier.s_name, supplier.s_address
-    LeftSemi Join: supplier.s_suppkey = __sq_2.ps_suppkey
-      Inner Join: supplier.s_nationkey = nation.n_nationkey
-        TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey]
-        Filter: nation.n_name = Utf8("CANADA")
-          TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8("CANADA")]
-      Projection: partsupp.ps_suppkey AS ps_suppkey, alias=__sq_2
-        Filter: CAST(partsupp.ps_availqty AS Float64) > __sq_3.__value
-          Inner Join: partsupp.ps_partkey = __sq_3.l_partkey, partsupp.ps_suppkey = __sq_3.l_suppkey
-            LeftSemi Join: partsupp.ps_partkey = __sq_1.p_partkey
-              TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty]
-              Projection: part.p_partkey AS p_partkey, alias=__sq_1
-                Filter: part.p_name LIKE Utf8("forest%")
-                  TableScan: part projection=[p_partkey, p_name], partial_filters=[part.p_name LIKE Utf8("forest%")]
-            Projection: lineitem.l_partkey, lineitem.l_suppkey, Float64(0.5) * CAST(SUM(lineitem.l_quantity) AS Float64) AS __value, alias=__sq_3
-              Aggregate: groupBy=[[lineitem.l_partkey, lineitem.l_suppkey]], aggr=[[SUM(lineitem.l_quantity)]]
-                Filter: lineitem.l_shipdate >= Date32("8766")
-                  TableScan: lineitem projection=[l_partkey, l_suppkey, l_quantity, l_shipdate], partial_filters=[lineitem.l_shipdate >= Date32("8766")]"#
-        .to_string();
+    let expected = "Sort: supplier.s_name ASC NULLS LAST\
+    \n  Projection: supplier.s_name, supplier.s_address\
+    \n    LeftSemi Join: supplier.s_suppkey = __sq_2.ps_suppkey\
+    \n      Inner Join: supplier.s_nationkey = nation.n_nationkey\
+    \n        TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey]\
+    \n        Filter: nation.n_name = Utf8(\"CANADA\")\
+    \n          TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8(\"CANADA\")]\
+    \n      SubqueryAlias: __sq_2\
+    \n        Projection: partsupp.ps_suppkey AS ps_suppkey\
+    \n          Filter: CAST(partsupp.ps_availqty AS Float64) > __sq_3.__value\
+    \n            Inner Join: partsupp.ps_partkey = __sq_3.l_partkey, partsupp.ps_suppkey = __sq_3.l_suppkey\
+    \n              LeftSemi Join: partsupp.ps_partkey = __sq_1.p_partkey\
+    \n                TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty]\
+    \n                SubqueryAlias: __sq_1\
+    \n                  Projection: part.p_partkey AS p_partkey\
+    \n                    Filter: part.p_name LIKE Utf8(\"forest%\")\
+    \n                      TableScan: part projection=[p_partkey, p_name], partial_filters=[part.p_name LIKE Utf8(\"forest%\")]\
+    \n              SubqueryAlias: __sq_3\
+    \n                Projection: lineitem.l_partkey, lineitem.l_suppkey, Float64(0.5) * CAST(SUM(lineitem.l_quantity) AS Float64) AS __value\
+    \n                  Aggregate: groupBy=[[lineitem.l_partkey, lineitem.l_suppkey]], aggr=[[SUM(lineitem.l_quantity)]]\
+    \n                    Filter: lineitem.l_shipdate >= Date32(\"8766\")\
+    \n                      TableScan: lineitem projection=[l_partkey, l_suppkey, l_quantity, l_shipdate], partial_filters=[lineitem.l_shipdate >= Date32(\"8766\")]";
     assert_eq!(actual, expected);
 
     // assert data
@@ -380,22 +383,22 @@ order by cntrycode;"#;
         .map_err(|e| format!("{:?} at {}", e, "error"))
         .unwrap();
     let actual = format!("{}", plan.display_indent());
-    let expected = r#"Sort: custsale.cntrycode ASC NULLS LAST
-  Projection: custsale.cntrycode, COUNT(UInt8(1)) AS numcust, SUM(custsale.c_acctbal) AS totacctbal
-    Aggregate: groupBy=[[custsale.cntrycode]], aggr=[[COUNT(UInt8(1)), SUM(custsale.c_acctbal)]]
-      Projection: cntrycode, customer.c_acctbal, alias=custsale
-        Projection: substr(customer.c_phone, Int64(1), Int64(2)) AS cntrycode, customer.c_acctbal
-          Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > __sq_1.__value
-            CrossJoin:
-              LeftAnti Join: customer.c_custkey = orders.o_custkey
-                Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])
-                  TableScan: customer projection=[c_custkey, c_phone, c_acctbal], partial_filters=[substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])]
-                TableScan: orders projection=[o_custkey]
-              Projection: AVG(customer.c_acctbal) AS __value, alias=__sq_1
-                Aggregate: groupBy=[[]], aggr=[[AVG(customer.c_acctbal)]]
-                  Filter: customer.c_acctbal > Decimal128(Some(0),15,2) AND substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])
-                    TableScan: customer projection=[c_phone, c_acctbal], partial_filters=[CAST(customer.c_acctbal AS Decimal128(30, 15)) > Decimal128(Some(0),30,15), substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")]), customer.c_acctbal > Decimal128(Some(0),15,2)]"#
-        .to_string();
+    let expected = "Sort: custsale.cntrycode ASC NULLS LAST\
+    \n  Projection: custsale.cntrycode, COUNT(UInt8(1)) AS numcust, SUM(custsale.c_acctbal) AS totacctbal\
+    \n    Aggregate: groupBy=[[custsale.cntrycode]], aggr=[[COUNT(UInt8(1)), SUM(custsale.c_acctbal)]]\
+    \n      SubqueryAlias: custsale\
+    \n        Projection: substr(customer.c_phone, Int64(1), Int64(2)) AS cntrycode, customer.c_acctbal\
+    \n          Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > __sq_1.__value\
+    \n            CrossJoin:\
+    \n              LeftAnti Join: customer.c_custkey = orders.o_custkey\
+    \n                Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8(\"13\"), Utf8(\"31\"), Utf8(\"23\"), Utf8(\"29\"), Utf8(\"30\"), Utf8(\"18\"), Utf8(\"17\")])\
+    \n                  TableScan: customer projection=[c_custkey, c_phone, c_acctbal], partial_filters=[substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8(\"13\"), Utf8(\"31\"), Utf8(\"23\"), Utf8(\"29\"), Utf8(\"30\"), Utf8(\"18\"), Utf8(\"17\")])]\
+    \n                TableScan: orders projection=[o_custkey]\
+    \n              SubqueryAlias: __sq_1\
+    \n                Projection: AVG(customer.c_acctbal) AS __value\
+    \n                  Aggregate: groupBy=[[]], aggr=[[AVG(customer.c_acctbal)]]\
+    \n                    Filter: customer.c_acctbal > Decimal128(Some(0),15,2) AND substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8(\"13\"), Utf8(\"31\"), Utf8(\"23\"), Utf8(\"29\"), Utf8(\"30\"), Utf8(\"18\"), Utf8(\"17\")])\
+    \n                      TableScan: customer projection=[c_phone, c_acctbal], partial_filters=[CAST(customer.c_acctbal AS Decimal128(30, 15)) > Decimal128(Some(0),30,15), substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8(\"13\"), Utf8(\"31\"), Utf8(\"23\"), Utf8(\"29\"), Utf8(\"30\"), Utf8(\"18\"), Utf8(\"17\")]), customer.c_acctbal > Decimal128(Some(0),15,2)]";
     assert_eq!(expected, actual);
 
     // assert data
@@ -442,26 +445,26 @@ order by value desc;
         .map_err(|e| format!("{:?} at {}", e, "error"))
         .unwrap();
     let actual = format!("{}", plan.display_indent());
-    let expected = r#"Sort: value DESC NULLS FIRST
-  Projection: partsupp.ps_partkey, SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS value
-    Filter: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Decimal128(38, 15)) > CAST(__sq_1.__value AS Decimal128(38, 15))
-      CrossJoin:
-        Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]
-          Inner Join: supplier.s_nationkey = nation.n_nationkey
-            Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
-              TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost]
-              TableScan: supplier projection=[s_suppkey, s_nationkey]
-            Filter: nation.n_name = Utf8("GERMANY")
-              TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8("GERMANY")]
-        Projection: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Float64) * Float64(0.0001) AS __value, alias=__sq_1
-          Aggregate: groupBy=[[]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]
-            Inner Join: supplier.s_nationkey = nation.n_nationkey
-              Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
-                TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost]
-                TableScan: supplier projection=[s_suppkey, s_nationkey]
-              Filter: nation.n_name = Utf8("GERMANY")
-                TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8("GERMANY")]"#
-        .to_string();
+    let expected = "Sort: value DESC NULLS FIRST\
+    \n  Projection: partsupp.ps_partkey, SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS value\
+    \n    Filter: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Decimal128(38, 15)) > CAST(__sq_1.__value AS Decimal128(38, 15))\
+    \n      CrossJoin:\
+    \n        Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]\
+    \n          Inner Join: supplier.s_nationkey = nation.n_nationkey\
+    \n            Inner Join: partsupp.ps_suppkey = supplier.s_suppkey\
+    \n              TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost]\
+    \n              TableScan: supplier projection=[s_suppkey, s_nationkey]\
+    \n            Filter: nation.n_name = Utf8(\"GERMANY\")\
+    \n              TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8(\"GERMANY\")]\
+    \n        SubqueryAlias: __sq_1\
+    \n          Projection: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Float64) * Float64(0.0001) AS __value\
+    \n            Aggregate: groupBy=[[]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]\
+    \n              Inner Join: supplier.s_nationkey = nation.n_nationkey\
+    \n                Inner Join: partsupp.ps_suppkey = supplier.s_suppkey\
+    \n                  TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost]\
+    \n                  TableScan: supplier projection=[s_suppkey, s_nationkey]\
+    \n                Filter: nation.n_name = Utf8(\"GERMANY\")\
+    \n                  TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8(\"GERMANY\")]";
     assert_eq!(actual, expected);
 
     // assert data
diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs
index 9367ee63b..95d0ed929 100644
--- a/datafusion/core/tests/sql/window.rs
+++ b/datafusion/core/tests/sql/window.rs
@@ -335,18 +335,19 @@ async fn window_expr_eliminate() -> Result<()> {
         "  Sort: d.b ASC NULLS LAST [b:Utf8, max_a:Int64;N]",
         "    Projection: d.b, MAX(d.a) AS max_a [b:Utf8, max_a:Int64;N]",
         "      Aggregate: groupBy=[[d.b]], aggr=[[MAX(d.a)]] [b:Utf8, MAX(d.a):Int64;N]",
-        "        Projection: _data2.a, _data2.b, alias=d [a:Int64, b:Utf8]",
-        "          Projection: s.a, s.b, alias=_data2 [a:Int64, b:Utf8]",
-        "            Projection: a, b, alias=s [a:Int64, b:Utf8]",
-        "              Union [a:Int64, b:Utf8]",
-        "                Projection: Int64(1) AS a, Utf8(\"aa\") AS b [a:Int64, b:Utf8]",
-        "                  EmptyRelation []",
-        "                Projection: Int64(3) AS a, Utf8(\"aa\") AS b [a:Int64, b:Utf8]",
-        "                  EmptyRelation []",
-        "                Projection: Int64(5) AS a, Utf8(\"bb\") AS b [a:Int64, b:Utf8]",
-        "                  EmptyRelation []",
-        "                Projection: Int64(7) AS a, Utf8(\"bb\") AS b [a:Int64, b:Utf8]",
-        "                  EmptyRelation []",
+        "        SubqueryAlias: d [a:Int64, b:Utf8]",
+        "          SubqueryAlias: _data2 [a:Int64, b:Utf8]",
+        "            Projection: s.a, s.b [a:Int64, b:Utf8]",
+        "              SubqueryAlias: s [a:Int64, b:Utf8]",
+        "                Union [a:Int64, b:Utf8]",
+        "                  Projection: Int64(1) AS a, Utf8(\"aa\") AS b [a:Int64, b:Utf8]",
+        "                    EmptyRelation []",
+        "                  Projection: Int64(3) AS a, Utf8(\"aa\") AS b [a:Int64, b:Utf8]",
+        "                    EmptyRelation []",
+        "                  Projection: Int64(5) AS a, Utf8(\"bb\") AS b [a:Int64, b:Utf8]",
+        "                    EmptyRelation []",
+        "                  Projection: Int64(7) AS a, Utf8(\"bb\") AS b [a:Int64, b:Utf8]",
+        "                    EmptyRelation []",
     ];
     let formatted = plan.display_indent_schema().to_string();
     let actual: Vec<&str> = formatted.trim().lines().collect();
@@ -398,19 +399,20 @@ async fn window_expr_eliminate() -> Result<()> {
         "  Sort: d.b ASC NULLS LAST [b:Utf8, max_a:Int64;N, MAX(d.seq):UInt64;N]",
         "    Projection: d.b, MAX(d.a) AS max_a, MAX(d.seq) [b:Utf8, max_a:Int64;N, MAX(d.seq):UInt64;N]",
         "      Aggregate: groupBy=[[d.b]], aggr=[[MAX(d.a), MAX(d.seq)]] [b:Utf8, MAX(d.a):Int64;N, MAX(d.seq):UInt64;N]",
-        "        Projection: _data2.seq, _data2.a, _data2.b, alias=d [seq:UInt64;N, a:Int64, b:Utf8]",
-        "          Projection: ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] AS seq, s.a, s.b, alias=_data2 [seq:UInt64;N, a:Int64, b:Utf8]",
-        "            WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST]]] [ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST]:UInt64;N, a:Int64, b:Utf8]",
-        "              Projection: a, b, alias=s [a:Int64, b:Utf8]",
-        "                Union [a:Int64, b:Utf8]",
-        "                  Projection: Int64(1) AS a, Utf8(\"aa\") AS b [a:Int64, b:Utf8]",
-        "                    EmptyRelation []",
-        "                  Projection: Int64(3) AS a, Utf8(\"aa\") AS b [a:Int64, b:Utf8]",
-        "                    EmptyRelation []",
-        "                  Projection: Int64(5) AS a, Utf8(\"bb\") AS b [a:Int64, b:Utf8]",
-        "                    EmptyRelation []",
-        "                  Projection: Int64(7) AS a, Utf8(\"bb\") AS b [a:Int64, b:Utf8]",
-        "                    EmptyRelation []",
+        "        SubqueryAlias: d [seq:UInt64;N, a:Int64, b:Utf8]",
+        "          SubqueryAlias: _data2 [seq:UInt64;N, a:Int64, b:Utf8]",
+        "            Projection: ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] AS seq, s.a, s.b [seq:UInt64;N, a:Int64, b:Utf8]",
+        "              WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST]]] [ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST]:UInt64;N, a:Int64, b:Utf8]",
+        "                SubqueryAlias: s [a:Int64, b:Utf8]",
+        "                  Union [a:Int64, b:Utf8]",
+        "                    Projection: Int64(1) AS a, Utf8(\"aa\") AS b [a:Int64, b:Utf8]",
+        "                      EmptyRelation []",
+        "                    Projection: Int64(3) AS a, Utf8(\"aa\") AS b [a:Int64, b:Utf8]",
+        "                      EmptyRelation []",
+        "                    Projection: Int64(5) AS a, Utf8(\"bb\") AS b [a:Int64, b:Utf8]",
+        "                      EmptyRelation []",
+        "                    Projection: Int64(7) AS a, Utf8(\"bb\") AS b [a:Int64, b:Utf8]",
+        "                      EmptyRelation []",
     ];
     let formatted = plan.display_indent_schema().to_string();
     let actual: Vec<&str> = formatted.trim().lines().collect();
diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs
index e9ef0f0cf..71097f5a6 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -328,7 +328,6 @@ impl LogicalPlanBuilder {
                 input,
                 mut expr,
                 schema: _,
-                alias,
             }) if missing_cols
                 .iter()
                 .all(|c| input.schema().field_from_column(c).is_ok()) =>
@@ -343,7 +342,7 @@ impl LogicalPlanBuilder {
                 // projected alias.
                 missing_exprs.retain(|e| !expr.contains(e));
                 expr.extend(missing_exprs);
-                Ok(project_with_alias((*input).clone(), expr, alias)?)
+                Ok(project_with_alias((*input).clone(), expr, None)?)
             }
             _ => {
                 let new_inputs = curr_plan
@@ -858,11 +857,10 @@ pub(crate) fn validate_unique_names<'a>(
     })
 }
 
-pub fn project_with_column_index_alias(
+pub fn project_with_column_index(
     expr: Vec<Expr>,
     input: Arc<LogicalPlan>,
     schema: DFSchemaRef,
-    alias: Option<String>,
 ) -> Result<LogicalPlan> {
     let alias_expr = expr
         .into_iter()
@@ -873,9 +871,9 @@ pub fn project_with_column_index_alias(
             x => x.alias(schema.field(i).name()),
         })
         .collect::<Vec<_>>();
-    Ok(LogicalPlan::Projection(
-        Projection::try_new_with_schema_alias(alias_expr, input, schema, alias)?,
-    ))
+    Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+        alias_expr, input, schema,
+    )?))
 }
 
 /// Union two logical plans.
@@ -928,14 +926,13 @@ pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalP
         .map(|p| {
             let plan = coerce_plan_expr_for_schema(&p, &union_schema)?;
             match plan {
-                LogicalPlan::Projection(Projection {
-                    expr, input, alias, ..
-                }) => Ok(Arc::new(project_with_column_index_alias(
-                    expr.to_vec(),
-                    input,
-                    Arc::new(union_schema.clone()),
-                    alias,
-                )?)),
+                LogicalPlan::Projection(Projection { expr, input, .. }) => {
+                    Ok(Arc::new(project_with_column_index(
+                        expr.to_vec(),
+                        input,
+                        Arc::new(union_schema.clone()),
+                    )?))
+                }
                 x => Ok(Arc::new(x)),
             }
         })
@@ -980,19 +977,27 @@ pub fn project_with_alias(
         exprlist_to_fields(&projected_expr, &plan)?,
         plan.schema().metadata().clone(),
     )?;
-    let schema = match alias {
-        Some(ref alias) => input_schema.replace_qualifier(alias.as_str()),
-        None => input_schema,
-    };
 
-    Ok(LogicalPlan::Projection(
-        Projection::try_new_with_schema_alias(
-            projected_expr,
-            Arc::new(plan.clone()),
-            DFSchemaRef::new(schema),
-            alias,
-        )?,
-    ))
+    let projection = LogicalPlan::Projection(Projection::try_new_with_schema(
+        projected_expr,
+        Arc::new(plan.clone()),
+        DFSchemaRef::new(input_schema),
+    )?);
+    match alias {
+        Some(alias) => Ok(with_alias(projection, alias)),
+        None => Ok(projection),
+    }
+}
+
+/// Create a SubqueryAlias to wrap a LogicalPlan.
+pub fn with_alias(plan: LogicalPlan, alias: String) -> LogicalPlan {
+    let plan_schema = &**plan.schema();
+    let schema = (plan_schema.clone()).replace_qualifier(alias.as_str());
+    LogicalPlan::SubqueryAlias(SubqueryAlias {
+        input: Arc::new(plan),
+        alias,
+        schema: Arc::new(schema),
+    })
 }
 
 /// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema.
diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs
index d41ad797c..82e44986a 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -781,9 +781,7 @@ impl LogicalPlan {
 
                         Ok(())
                     }
-                    LogicalPlan::Projection(Projection {
-                        ref expr, alias, ..
-                    }) => {
+                    LogicalPlan::Projection(Projection { ref expr, .. }) => {
                         write!(f, "Projection: ")?;
                         for (i, expr_item) in expr.iter().enumerate() {
                             if i > 0 {
@@ -791,9 +789,6 @@ impl LogicalPlan {
                             }
                             write!(f, "{:?}", expr_item)?;
                         }
-                        if let Some(a) = alias {
-                            write!(f, ", alias={}", a)?;
-                        }
                         Ok(())
                     }
                     LogicalPlan::Filter(Filter {
@@ -1115,8 +1110,6 @@ pub struct Projection {
     pub input: Arc<LogicalPlan>,
     /// The schema description of the output
     pub schema: DFSchemaRef,
-    /// Projection output relation alias
-    pub alias: Option<String>,
 }
 
 impl Projection {
@@ -1137,16 +1130,6 @@ impl Projection {
         expr: Vec<Expr>,
         input: Arc<LogicalPlan>,
         schema: DFSchemaRef,
-    ) -> Result<Self, DataFusionError> {
-        Self::try_new_with_schema_alias(expr, input, schema, None)
-    }
-
-    /// Create a new Projection using the specified output schema
-    pub fn try_new_with_schema_alias(
-        expr: Vec<Expr>,
-        input: Arc<LogicalPlan>,
-        schema: DFSchemaRef,
-        alias: Option<String>,
     ) -> Result<Self, DataFusionError> {
         if expr.len() != schema.fields().len() {
             return Err(DataFusionError::Plan(format!("Projection has mismatch between number of expressions ({}) and number of fields in schema ({})", expr.len(), schema.fields().len())));
@@ -1155,7 +1138,6 @@ impl Projection {
             expr,
             input,
             schema,
-            alias,
         })
     }
 
@@ -1171,7 +1153,6 @@ impl Projection {
             expr,
             input,
             schema,
-            alias: None,
         }
     }
 
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 341ae928b..c84f4b8d7 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -366,14 +366,13 @@ pub fn from_plan(
     inputs: &[LogicalPlan],
 ) -> Result<LogicalPlan> {
     match plan {
-        LogicalPlan::Projection(Projection { schema, alias, .. }) => Ok(
-            LogicalPlan::Projection(Projection::try_new_with_schema_alias(
+        LogicalPlan::Projection(Projection { schema, .. }) => {
+            Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
                 expr.to_vec(),
                 Arc::new(inputs[0].clone()),
                 schema.clone(),
-                alias.clone(),
-            )?),
-        ),
+            )?))
+        }
         LogicalPlan::Values(Values { schema, .. }) => Ok(LogicalPlan::Values(Values {
             schema: schema.clone(),
             values: expr
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index fdbda47f2..11ed5cdbe 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -101,7 +101,6 @@ impl OptimizerRule for CommonSubexprEliminate {
                 expr,
                 input,
                 schema,
-                alias,
             }) => {
                 let input_schema = Arc::clone(input.schema());
                 let arrays = to_arrays(expr, input_schema, &mut expr_set)?;
@@ -114,14 +113,11 @@ impl OptimizerRule for CommonSubexprEliminate {
                     optimizer_config,
                 )?;
 
-                Ok(LogicalPlan::Projection(
-                    Projection::try_new_with_schema_alias(
-                        pop_expr(&mut new_expr)?,
-                        Arc::new(new_input),
-                        schema.clone(),
-                        alias.clone(),
-                    )?,
-                ))
+                Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+                    pop_expr(&mut new_expr)?,
+                    Arc::new(new_input),
+                    schema.clone(),
+                )?))
             }
             LogicalPlan::Filter(filter) => {
                 let input = filter.input();
diff --git a/datafusion/optimizer/src/decorrelate_where_in.rs b/datafusion/optimizer/src/decorrelate_where_in.rs
index fa0367c45..ba8fba543 100644
--- a/datafusion/optimizer/src/decorrelate_where_in.rs
+++ b/datafusion/optimizer/src/decorrelate_where_in.rs
@@ -258,14 +258,15 @@ mod tests {
             .build()?;
         debug!("plan to optimize:\n{}", plan.display_indent());
 
-        let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
-  LeftSemi Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64, c_name:Utf8]
-    LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
-      TableScan: customer [c_custkey:Int64, c_name:Utf8]
-      Projection: orders.o_custkey AS o_custkey, alias=__sq_1 [o_custkey:Int64]
-        TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
-    Projection: orders.o_custkey AS o_custkey, alias=__sq_2 [o_custkey:Int64]
-      TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#;
+        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
+        \n  LeftSemi Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64, c_name:Utf8]\
+        \n    LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
+        \n      TableScan: customer [c_custkey:Int64, c_name:Utf8]\
+        \n      SubqueryAlias: __sq_1 [o_custkey:Int64]\
+        \n        Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\
+        \n          TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
+        \n    SubqueryAlias: __sq_2 [o_custkey:Int64]\n      Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\
+        \n        TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
         assert_optimized_plan_eq(&DecorrelateWhereIn::new(), &plan, expected);
         Ok(())
     }
@@ -296,14 +297,16 @@ mod tests {
             .project(vec![col("customer.c_custkey")])?
             .build()?;
 
-        let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
-  LeftSemi Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64, c_name:Utf8]
-    TableScan: customer [c_custkey:Int64, c_name:Utf8]
-    Projection: orders.o_custkey AS o_custkey, alias=__sq_2 [o_custkey:Int64]
-      LeftSemi Join: orders.o_orderkey = __sq_1.l_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
-        TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
-        Projection: lineitem.l_orderkey AS l_orderkey, alias=__sq_1 [l_orderkey:Int64]
-          TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64, l_suppkey:Int64, l_linenumber:Int32, l_quantity:Float64, l_extendedprice:Float64]"#;
+        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
+        \n  LeftSemi Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64, c_name:Utf8]\
+        \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
+        \n    SubqueryAlias: __sq_2 [o_custkey:Int64]\
+        \n      Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\
+        \n        LeftSemi Join: orders.o_orderkey = __sq_1.l_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
+        \n          TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
+        \n          SubqueryAlias: __sq_1 [l_orderkey:Int64]\
+        \n            Projection: lineitem.l_orderkey AS l_orderkey [l_orderkey:Int64]\
+        \n              TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64, l_suppkey:Int64, l_linenumber:Int32, l_quantity:Float64, l_extendedprice:Float64]";
 
         assert_optimized_plan_eq(&DecorrelateWhereIn::new(), &plan, expected);
         Ok(())
@@ -328,12 +331,13 @@ mod tests {
             .project(vec![col("customer.c_custkey")])?
             .build()?;
 
-        let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
-  LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
-    TableScan: customer [c_custkey:Int64, c_name:Utf8]
-    Projection: orders.o_custkey AS o_custkey, alias=__sq_1 [o_custkey:Int64]
-      Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
-        TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#;
+        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
+        \n  LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
+        \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
+        \n    SubqueryAlias: __sq_1 [o_custkey:Int64]\
+        \n      Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\
+        \n        Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
+        \n          TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
 
         assert_optimized_plan_eq(&DecorrelateWhereIn::new(), &plan, expected);
         Ok(())
@@ -355,12 +359,13 @@ mod tests {
             .build()?;
 
         // Query will fail, but we can still transform the plan
-        let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
-  LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
-    TableScan: customer [c_custkey:Int64, c_name:Utf8]
-    Projection: orders.o_custkey AS o_custkey, alias=__sq_1 [o_custkey:Int64]
-      Filter: customer.c_custkey = customer.c_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
-        TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#;
+        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
+        \n  LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
+        \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
+        \n    SubqueryAlias: __sq_1 [o_custkey:Int64]\
+        \n      Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\
+        \n        Filter: customer.c_custkey = customer.c_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
+        \n          TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
 
         assert_optimized_plan_eq(&DecorrelateWhereIn::new(), &plan, expected);
         Ok(())
@@ -381,12 +386,13 @@ mod tests {
             .project(vec![col("customer.c_custkey")])?
             .build()?;
 
-        let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
-  LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
-    TableScan: customer [c_custkey:Int64, c_name:Utf8]
-    Projection: orders.o_custkey AS o_custkey, alias=__sq_1 [o_custkey:Int64]
-      Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
-        TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#;
+        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
+        \n  LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
+        \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
+        \n    SubqueryAlias: __sq_1 [o_custkey:Int64]\
+        \n      Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\
+        \n        Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
+        \n          TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
 
         assert_optimized_plan_eq(&DecorrelateWhereIn::new(), &plan, expected);
         Ok(())
@@ -407,11 +413,12 @@ mod tests {
             .project(vec![col("customer.c_custkey")])?
             .build()?;
 
-        let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
-  LeftSemi Join: customer.c_custkey = __sq_1.o_custkey Filter: customer.c_custkey != orders.o_custkey [c_custkey:Int64, c_name:Utf8]
-    TableScan: customer [c_custkey:Int64, c_name:Utf8]
-    Projection: orders.o_custkey AS o_custkey, alias=__sq_1 [o_custkey:Int64]
-      TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#;
+        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
+        \n  LeftSemi Join: customer.c_custkey = __sq_1.o_custkey Filter: customer.c_custkey != orders.o_custkey [c_custkey:Int64, c_name:Utf8]\
+        \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
+        \n    SubqueryAlias: __sq_1 [o_custkey:Int64]\
+        \n      Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\
+        \n        TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
 
         assert_optimized_plan_eq(&DecorrelateWhereIn::new(), &plan, expected);
         Ok(())
@@ -583,12 +590,13 @@ mod tests {
             .project(vec![col("customer.c_custkey")])?
             .build()?;
 
-        let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
-  Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8]
-    LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
-      TableScan: customer [c_custkey:Int64, c_name:Utf8]
-      Projection: orders.o_custkey AS o_custkey, alias=__sq_1 [o_custkey:Int64]
-        TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#;
+        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
+        \n  Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8]\
+        \n    LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
+        \n      TableScan: customer [c_custkey:Int64, c_name:Utf8]\
+        \n      SubqueryAlias: __sq_1 [o_custkey:Int64]\
+        \n        Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\
+        \n          TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
 
         assert_optimized_plan_eq(&DecorrelateWhereIn::new(), &plan, expected);
         Ok(())
@@ -640,11 +648,12 @@ mod tests {
             .project(vec![col("test.b")])?
             .build()?;
 
-        let expected = r#"Projection: test.b [b:UInt32]
-  LeftSemi Join: test.c = __sq_1.c, test.a = __sq_1.a [a:UInt32, b:UInt32, c:UInt32]
-    TableScan: test [a:UInt32, b:UInt32, c:UInt32]
-    Projection: sq.c AS c, sq.a AS a, alias=__sq_1 [c:UInt32, a:UInt32]
-      TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"#;
+        let expected = "Projection: test.b [b:UInt32]\
+        \n  LeftSemi Join: test.c = __sq_1.c, test.a = __sq_1.a [a:UInt32, b:UInt32, c:UInt32]\
+        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
+        \n    SubqueryAlias: __sq_1 [c:UInt32, a:UInt32]\
+        \n      Projection: sq.c AS c, sq.a AS a [c:UInt32, a:UInt32]\
+        \n        TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
 
         assert_optimized_plan_eq(&DecorrelateWhereIn::new(), &plan, expected);
         Ok(())
@@ -659,11 +668,12 @@ mod tests {
             .project(vec![col("test.b")])?
             .build()?;
 
-        let expected = r#"Projection: test.b [b:UInt32]
-  LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]
-    TableScan: test [a:UInt32, b:UInt32, c:UInt32]
-    Projection: sq.c AS c, alias=__sq_1 [c:UInt32]
-      TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"#;
+        let expected = "Projection: test.b [b:UInt32]\
+        \n  LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
+        \n    SubqueryAlias: __sq_1 [c:UInt32]\
+        \n      Projection: sq.c AS c [c:UInt32]\
+        \n        TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
 
         assert_optimized_plan_eq(&DecorrelateWhereIn::new(), &plan, expected);
         Ok(())
@@ -678,11 +688,12 @@ mod tests {
             .project(vec![col("test.b")])?
             .build()?;
 
-        let expected = r#"Projection: test.b [b:UInt32]
-  LeftAnti Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]
-    TableScan: test [a:UInt32, b:UInt32, c:UInt32]
-    Projection: sq.c AS c, alias=__sq_1 [c:UInt32]
-      TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"#;
+        let expected = "Projection: test.b [b:UInt32]\
+        \n  LeftAnti Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
+        \n    SubqueryAlias: __sq_1 [c:UInt32]\
+        \n      Projection: sq.c AS c [c:UInt32]\
+        \n        TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
 
         assert_optimized_plan_eq(&DecorrelateWhereIn::new(), &plan, expected);
         Ok(())
diff --git a/datafusion/optimizer/src/filter_push_down.rs b/datafusion/optimizer/src/filter_push_down.rs
index 9ba30fae5..2f8a8a8b4 100644
--- a/datafusion/optimizer/src/filter_push_down.rs
+++ b/datafusion/optimizer/src/filter_push_down.rs
@@ -550,7 +550,6 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
             input,
             expr,
             schema,
-            alias: _,
         }) => {
             // A projection is filter-commutable, but re-writes all predicate expressions
             // collect projection.
@@ -1194,14 +1193,15 @@ mod tests {
             .build()?;
 
         // filter appears below Union
-        let expected = "\
-            Union\
-            \n  Projection: test.a AS b, alias=test2\
-            \n    Filter: test.a = Int64(1)\
-            \n      TableScan: test\
-            \n  Projection: test.a AS b, alias=test2\
-            \n    Filter: test.a = Int64(1)\
-            \n      TableScan: test";
+        let expected = "Union\
+        \n  SubqueryAlias: test2\
+        \n    Projection: test.a AS b\
+        \n      Filter: test.a = Int64(1)\
+        \n        TableScan: test\
+        \n  SubqueryAlias: test2\
+        \n    Projection: test.a AS b\
+        \n      Filter: test.a = Int64(1)\
+        \n        TableScan: test";
         assert_optimized_plan_eq(&plan, expected);
         Ok(())
     }
@@ -2300,22 +2300,24 @@ mod tests {
             .project(vec![col("b.a")])?
             .build()?;
 
-        let expected_before = "\
-        Projection: b.a\
+        let expected_before = "Projection: b.a\
         \n  Filter: b.a = Int64(1)\
-        \n    Projection: b.a, alias=b\
-        \n      Projection: Int64(0) AS a, alias=b\
-        \n        EmptyRelation";
+        \n    SubqueryAlias: b\
+        \n      Projection: b.a\
+        \n        SubqueryAlias: b\
+        \n          Projection: Int64(0) AS a\
+        \n            EmptyRelation";
         assert_eq!(format!("{:?}", plan), expected_before);
 
         // Ensure that the predicate without any columns (0 = 1) is
         // still there.
-        let expected_after = "\
-        Projection: b.a\
-        \n  Projection: b.a, alias=b\
-        \n    Projection: Int64(0) AS a, alias=b\
-        \n      Filter: Int64(0) = Int64(1)\
-        \n        EmptyRelation";
+        let expected_after = "Projection: b.a\
+        \n  Filter: b.a = Int64(1)\
+        \n    SubqueryAlias: b\
+        \n      Projection: b.a\
+        \n        SubqueryAlias: b\
+        \n          Projection: Int64(0) AS a\
+        \n            EmptyRelation";
         assert_optimized_plan_eq(&plan, expected_after);
 
         Ok(())
diff --git a/datafusion/optimizer/src/inline_table_scan.rs b/datafusion/optimizer/src/inline_table_scan.rs
index e790ce0b7..4004a2b3d 100644
--- a/datafusion/optimizer/src/inline_table_scan.rs
+++ b/datafusion/optimizer/src/inline_table_scan.rs
@@ -110,6 +110,7 @@ mod tests {
     pub struct CustomSource {
         plan: LogicalPlan,
     }
+
     impl CustomSource {
         fn new() -> Self {
             Self {
@@ -120,6 +121,7 @@ mod tests {
             }
         }
     }
+
     impl TableSource for CustomSource {
         fn as_any(&self) -> &dyn std::any::Any {
             self
@@ -156,10 +158,10 @@ mod tests {
             .optimize(&plan, &mut OptimizerConfig::new())
             .expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimized_plan);
-        let expected = "\
-        Filter: x.a = Int32(1)\
-        \n  Projection: y.a, alias=x\
-        \n    TableScan: y";
+        let expected = "Filter: x.a = Int32(1)\
+        \n  SubqueryAlias: x\
+        \n    Projection: y.a\
+        \n      TableScan: y";
 
         assert_eq!(formatted_plan, expected);
         assert_eq!(plan.schema(), optimized_plan.schema());
diff --git a/datafusion/optimizer/src/limit_push_down.rs b/datafusion/optimizer/src/limit_push_down.rs
index fd29015d5..9dbda4653 100644
--- a/datafusion/optimizer/src/limit_push_down.rs
+++ b/datafusion/optimizer/src/limit_push_down.rs
@@ -141,11 +141,10 @@ impl OptimizerRule for LimitPushDown {
                     input: Arc::new((*projection.input).clone()),
                 });
                 // Push down limit directly (projection doesn't change number of rows)
-                LogicalPlan::Projection(Projection::try_new_with_schema_alias(
+                LogicalPlan::Projection(Projection::try_new_with_schema(
                     projection.expr.clone(),
                     Arc::new(new_input),
                     projection.schema.clone(),
-                    projection.alias.clone(),
                 )?)
             }
 
diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs
index 340d246fe..afa391d41 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -370,7 +370,6 @@ mod tests {
             expr: vec![col("a"), col("b"), col("c")],
             input,
             schema: add_metadata_to_fields(input_schema.as_ref()),
-            alias: None,
         });
 
         // optimizing should be ok, but the schema will have changed  (no metadata)
diff --git a/datafusion/optimizer/src/projection_push_down.rs b/datafusion/optimizer/src/projection_push_down.rs
index 916071bd5..1e54d7184 100644
--- a/datafusion/optimizer/src/projection_push_down.rs
+++ b/datafusion/optimizer/src/projection_push_down.rs
@@ -87,7 +87,6 @@ fn optimize_plan(
             input,
             expr,
             schema,
-            alias,
         }) => {
             // projection:
             // * remove any expression that is not required
@@ -139,16 +138,11 @@ fn optimize_plan(
                 Ok(new_input)
             } else {
                 let metadata = new_input.schema().metadata().clone();
-                Ok(LogicalPlan::Projection(
-                    Projection::try_new_with_schema_alias(
-                        new_expr,
-                        Arc::new(new_input),
-                        DFSchemaRef::new(DFSchema::new_with_metadata(
-                            new_fields, metadata,
-                        )?),
-                        alias.clone(),
-                    )?,
-                ))
+                Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+                    new_expr,
+                    Arc::new(new_input),
+                    DFSchemaRef::new(DFSchema::new_with_metadata(new_fields, metadata)?),
+                )?))
             }
         }
         LogicalPlan::Join(Join {
@@ -438,9 +432,7 @@ fn optimize_plan(
 }
 
 fn projection_equal(p: &Projection, p2: &Projection) -> bool {
-    p.expr.len() == p2.expr.len()
-        && p.alias == p2.alias
-        && p.expr.iter().zip(&p2.expr).all(|(l, r)| l == r)
+    p.expr.len() == p2.expr.len() && p.expr.iter().zip(&p2.expr).all(|(l, r)| l == r)
 }
 
 fn replace_alias(
diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs
index 0e53ecfae..5e5167ab4 100644
--- a/datafusion/optimizer/src/scalar_subquery_to_join.rs
+++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs
@@ -393,18 +393,20 @@ mod tests {
             .project(vec![col("customer.c_custkey")])?
             .build()?;
 
-        let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
-  Filter: Int32(1) < __sq_2.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N, o_custkey:Int64, __value:Int64;N]
-    Inner Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N, o_custkey:Int64, __value:Int64;N]
-      Filter: Int32(1) < __sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]
-        Inner Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]
-          TableScan: customer [c_custkey:Int64, c_name:Utf8]
-          Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value, alias=__sq_1 [o_custkey:Int64, __value:Int64;N]
-            Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N]
-              TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
-      Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value, alias=__sq_2 [o_custkey:Int64, __value:Int64;N]
-        Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N]
-          TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#;
+        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
+        \n  Filter: Int32(1) < __sq_2.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N, o_custkey:Int64, __value:Int64;N]\
+        \n    Inner Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N, o_custkey:Int64, __value:Int64;N]\
+        \n      Filter: Int32(1) < __sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
+        \n        Inner Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
+        \n          TableScan: customer [c_custkey:Int64, c_name:Utf8]\
+        \n          SubqueryAlias: __sq_1 [o_custkey:Int64, __value:Int64;N]\
+        \n            Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value [o_custkey:Int64, __value:Int64;N]\
+        \n              Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N]\
+        \n                TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
+        \n      SubqueryAlias: __sq_2 [o_custkey:Int64, __value:Int64;N]\
+        \n        Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value [o_custkey:Int64, __value:Int64;N]\
+        \n          Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N]\
+        \n            TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
         assert_optimized_plan_eq(&ScalarSubqueryToJoin::new(), &plan, expected);
         Ok(())
     }
@@ -440,18 +442,20 @@ mod tests {
             .project(vec![col("customer.c_custkey")])?
             .build()?;
 
-        let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
-  Filter: customer.c_acctbal < __sq_2.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Float64;N]
-    Inner Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Float64;N]
-      TableScan: customer [c_custkey:Int64, c_name:Utf8]
-      Projection: orders.o_custkey, SUM(orders.o_totalprice) AS __value, alias=__sq_2 [o_custkey:Int64, __value:Float64;N]
-        Aggregate: groupBy=[[orders.o_custkey]], aggr=[[SUM(orders.o_totalprice)]] [o_custkey:Int64, SUM(orders.o_totalprice):Float64;N]
-          Filter: orders.o_totalprice < __sq_1.__value [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N, l_orderkey:Int64, __value:Float64;N]
-            Inner Join: orders.o_orderkey = __sq_1.l_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N, l_orderkey:Int64, __value:Float64;N]
-              TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
-              Projection: lineitem.l_orderkey, SUM(lineitem.l_extendedprice) AS __value, alias=__sq_1 [l_orderkey:Int64, __value:Float64;N]
-                Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_extendedprice)]] [l_orderkey:Int64, SUM(lineitem.l_extendedprice):Float64;N]
-                  TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64, l_suppkey:Int64, l_linenumber:Int32, l_quantity:Float64, l_extendedprice:Float64]"#;
+        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
+        \n  Filter: customer.c_acctbal < __sq_2.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Float64;N]\
+        \n    Inner Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Float64;N]\
+        \n      TableScan: customer [c_custkey:Int64, c_name:Utf8]\
+        \n      SubqueryAlias: __sq_2 [o_custkey:Int64, __value:Float64;N]\
+        \n        Projection: orders.o_custkey, SUM(orders.o_totalprice) AS __value [o_custkey:Int64, __value:Float64;N]\
+        \n          Aggregate: groupBy=[[orders.o_custkey]], aggr=[[SUM(orders.o_totalprice)]] [o_custkey:Int64, SUM(orders.o_totalprice):Float64;N]\
+        \n            Filter: orders.o_totalprice < __sq_1.__value [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N, l_orderkey:Int64, __value:Float64;N]\
+        \n              Inner Join: orders.o_orderkey = __sq_1.l_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N, l_orderkey:Int64, __value:Float64;N]\
+        \n                TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
+        \n                SubqueryAlias: __sq_1 [l_orderkey:Int64, __value:Float64;N]\
+        \n                  Projection: lineitem.l_orderkey, SUM(lineitem.l_extendedprice) AS __value [l_orderkey:Int64, __value:Float64;N]\
+        \n                    Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_extendedprice)]] [l_orderkey:Int64, SUM(lineitem.l_extendedprice):Float64;N]\
+        \n                      TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64, l_suppkey:Int64, l_linenumber:Int32, l_quantity:Float64, l_extendedprice:Float64]";
         assert_optimized_plan_eq(&ScalarSubqueryToJoin::new(), &plan, expected);
         Ok(())
     }
@@ -476,13 +480,14 @@ mod tests {
             .project(vec![col("customer.c_custkey")])?
             .build()?;
 
-        let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
-  Inner Join: customer.c_custkey = __sq_1.o_custkey, customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]
-    TableScan: customer [c_custkey:Int64, c_name:Utf8]
-    Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value, alias=__sq_1 [o_custkey:Int64, __value:Int64;N]
-      Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N]
-        Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
-          TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#;
+        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
+        \n  Inner Join: customer.c_custkey = __sq_1.o_custkey, customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
+        \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
+        \n    SubqueryAlias: __sq_1 [o_custkey:Int64, __value:Int64;N]\
+        \n      Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value [o_custkey:Int64, __value:Int64;N]\
+        \n        Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N]\
+        \n          Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
+        \n            TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
 
         assert_optimized_plan_eq(&ScalarSubqueryToJoin::new(), &plan, expected);
         Ok(())
@@ -505,14 +510,15 @@ mod tests {
             .build()?;
 
         // it will optimize, but fail for the same reason the unoptimized query would
-        let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
-  Filter: customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]
-    CrossJoin: [c_custkey:Int64, c_name:Utf8, __value:Int64;N]
-      TableScan: customer [c_custkey:Int64, c_name:Utf8]
-      Projection: MAX(orders.o_custkey) AS __value, alias=__sq_1 [__value:Int64;N]
-        Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N]
-          Filter: customer.c_custkey = customer.c_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
-            TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#;
+        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
+        \n  Filter: customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
+        \n    CrossJoin: [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
+        \n      TableScan: customer [c_custkey:Int64, c_name:Utf8]\
+        \n      SubqueryAlias: __sq_1 [__value:Int64;N]\
+        \n        Projection: MAX(orders.o_custkey) AS __value [__value:Int64;N]\
+        \n          Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N]\
+        \n            Filter: customer.c_custkey = customer.c_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
+        \n              TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
         assert_optimized_plan_eq(&ScalarSubqueryToJoin::new(), &plan, expected);
         Ok(())
     }
@@ -533,14 +539,15 @@ mod tests {
             .project(vec![col("customer.c_custkey")])?
             .build()?;
 
-        let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
-  Filter: customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]
-    CrossJoin: [c_custkey:Int64, c_name:Utf8, __value:Int64;N]
-      TableScan: customer [c_custkey:Int64, c_name:Utf8]
-      Projection: MAX(orders.o_custkey) AS __value, alias=__sq_1 [__value:Int64;N]
-        Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N]
-          Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
-            TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#;
+        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
+        \n  Filter: customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
+        \n    CrossJoin: [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
+        \n      TableScan: customer [c_custkey:Int64, c_name:Utf8]\
+        \n      SubqueryAlias: __sq_1 [__value:Int64;N]\
+        \n        Projection: MAX(orders.o_custkey) AS __value [__value:Int64;N]\
+        \n          Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N]\
+        \n            Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
+        \n              TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
 
         assert_optimized_plan_eq(&ScalarSubqueryToJoin::new(), &plan, expected);
         Ok(())
@@ -710,14 +717,15 @@ mod tests {
             .project(vec![col("customer.c_custkey")])?
             .build()?;
 
-        let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
-  Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]
-    Filter: customer.c_custkey >= __sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]
-      Inner Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]
-        TableScan: customer [c_custkey:Int64, c_name:Utf8]
-        Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value, alias=__sq_1 [o_custkey:Int64, __value:Int64;N]
-          Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N]
-            TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#;
+        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
+        \n  Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
+        \n    Filter: customer.c_custkey >= __sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
+        \n      Inner Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
+        \n        TableScan: customer [c_custkey:Int64, c_name:Utf8]\
+        \n        SubqueryAlias: __sq_1 [o_custkey:Int64, __value:Int64;N]\
+        \n          Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value [o_custkey:Int64, __value:Int64;N]\
+        \n            Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N]\
+        \n              TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
 
         assert_optimized_plan_eq(&ScalarSubqueryToJoin::new(), &plan, expected);
         Ok(())
@@ -742,13 +750,14 @@ mod tests {
             .project(vec![col("customer.c_custkey")])?
             .build()?;
 
-        let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
-  Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]
-    Inner Join: customer.c_custkey = __sq_1.o_custkey, customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]
-      TableScan: customer [c_custkey:Int64, c_name:Utf8]
-      Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value, alias=__sq_1 [o_custkey:Int64, __value:Int64;N]
-        Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N]
-          TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#;
+        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
+        \n  Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
+        \n    Inner Join: customer.c_custkey = __sq_1.o_custkey, customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\
+        \n      TableScan: customer [c_custkey:Int64, c_name:Utf8]\
+        \n      SubqueryAlias: __sq_1 [o_custkey:Int64, __value:Int64;N]\
+        \n        Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value [o_custkey:Int64, __value:Int64;N]\
+        \n          Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N]\
+        \n            TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
 
         assert_optimized_plan_eq(&ScalarSubqueryToJoin::new(), &plan, expected);
         Ok(())
@@ -803,13 +812,14 @@ mod tests {
             .project(vec![col("test.c")])?
             .build()?;
 
-        let expected = r#"Projection: test.c [c:UInt32]
-  Filter: test.c < __sq_1.__value [a:UInt32, b:UInt32, c:UInt32, a:UInt32, __value:UInt32;N]
-    Inner Join: test.a = __sq_1.a [a:UInt32, b:UInt32, c:UInt32, a:UInt32, __value:UInt32;N]
-      TableScan: test [a:UInt32, b:UInt32, c:UInt32]
-      Projection: sq.a, MIN(sq.c) AS __value, alias=__sq_1 [a:UInt32, __value:UInt32;N]
-        Aggregate: groupBy=[[sq.a]], aggr=[[MIN(sq.c)]] [a:UInt32, MIN(sq.c):UInt32;N]
-          TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"#;
+        let expected = "Projection: test.c [c:UInt32]\
+        \n  Filter: test.c < __sq_1.__value [a:UInt32, b:UInt32, c:UInt32, a:UInt32, __value:UInt32;N]\
+        \n    Inner Join: test.a = __sq_1.a [a:UInt32, b:UInt32, c:UInt32, a:UInt32, __value:UInt32;N]\
+        \n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
+        \n      SubqueryAlias: __sq_1 [a:UInt32, __value:UInt32;N]\
+        \n        Projection: sq.a, MIN(sq.c) AS __value [a:UInt32, __value:UInt32;N]\
+        \n          Aggregate: groupBy=[[sq.a]], aggr=[[MIN(sq.c)]] [a:UInt32, MIN(sq.c):UInt32;N]\
+        \n            TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
 
         assert_optimized_plan_eq(&ScalarSubqueryToJoin::new(), &plan, expected);
         Ok(())
@@ -830,13 +840,14 @@ mod tests {
             .project(vec![col("customer.c_custkey")])?
             .build()?;
 
-        let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
-  Filter: customer.c_custkey < __sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]
-    CrossJoin: [c_custkey:Int64, c_name:Utf8, __value:Int64;N]
-      TableScan: customer [c_custkey:Int64, c_name:Utf8]
-      Projection: MAX(orders.o_custkey) AS __value, alias=__sq_1 [__value:Int64;N]
-        Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N]
-          TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#;
+        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
+        \n  Filter: customer.c_custkey < __sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
+        \n    CrossJoin: [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
+        \n      TableScan: customer [c_custkey:Int64, c_name:Utf8]\
+        \n      SubqueryAlias: __sq_1 [__value:Int64;N]\
+        \n        Projection: MAX(orders.o_custkey) AS __value [__value:Int64;N]\
+        \n          Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N]\
+        \n            TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
 
         assert_optimized_plan_eq(&ScalarSubqueryToJoin::new(), &plan, expected);
         Ok(())
@@ -856,13 +867,14 @@ mod tests {
             .project(vec![col("customer.c_custkey")])?
             .build()?;
 
-        let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
-  Filter: customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]
-    CrossJoin: [c_custkey:Int64, c_name:Utf8, __value:Int64;N]
-      TableScan: customer [c_custkey:Int64, c_name:Utf8]
-      Projection: MAX(orders.o_custkey) AS __value, alias=__sq_1 [__value:Int64;N]
-        Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N]
-          TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"#;
+        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
+        \n  Filter: customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
+        \n    CrossJoin: [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\
+        \n      TableScan: customer [c_custkey:Int64, c_name:Utf8]\
+        \n      SubqueryAlias: __sq_1 [__value:Int64;N]\
+        \n        Projection: MAX(orders.o_custkey) AS __value [__value:Int64;N]\
+        \n          Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N]\
+        \n            TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
 
         assert_optimized_plan_eq(&ScalarSubqueryToJoin::new(), &plan, expected);
         Ok(())
diff --git a/datafusion/optimizer/src/subquery_filter_to_join.rs b/datafusion/optimizer/src/subquery_filter_to_join.rs
index c1717da0d..504167d21 100644
--- a/datafusion/optimizer/src/subquery_filter_to_join.rs
+++ b/datafusion/optimizer/src/subquery_filter_to_join.rs
@@ -410,13 +410,15 @@ mod tests {
 
         let expected = "Projection: wrapped.b [b:UInt32]\
         \n  Filter: wrapped.b < UInt32(30) OR wrapped.c IN (<subquery>) [b:UInt32, c:UInt32]\
-        \n    Subquery: [c:UInt32]\n      Projection: sq_outer.c [c:UInt32]\
+        \n    Subquery: [c:UInt32]\
+        \n      Projection: sq_outer.c [c:UInt32]\
         \n        TableScan: sq_outer [a:UInt32, b:UInt32, c:UInt32]\
-        \n    Projection: test.b, test.c, alias=wrapped [b:UInt32, c:UInt32]\
-        \n      LeftSemi Join: test.c = sq_inner.c [a:UInt32, b:UInt32, c:UInt32]\
-        \n        TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
-        \n        Projection: sq_inner.c [c:UInt32]\
-        \n          TableScan: sq_inner [a:UInt32, b:UInt32, c:UInt32]";
+        \n    SubqueryAlias: wrapped [b:UInt32, c:UInt32]\
+        \n      Projection: test.b, test.c [b:UInt32, c:UInt32]\
+        \n        LeftSemi Join: test.c = sq_inner.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n          TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
+        \n          Projection: sq_inner.c [c:UInt32]\
+        \n            TableScan: sq_inner [a:UInt32, b:UInt32, c:UInt32]";
 
         assert_optimized_plan_eq(&plan, expected);
         Ok(())
diff --git a/datafusion/optimizer/tests/integration-test.rs b/datafusion/optimizer/tests/integration-test.rs
index fb27ed5ed..c4911439c 100644
--- a/datafusion/optimizer/tests/integration-test.rs
+++ b/datafusion/optimizer/tests/integration-test.rs
@@ -63,14 +63,15 @@ fn subquery_filter_with_cast() -> Result<()> {
         AND (cast('2002-05-08' as date) + interval '5 days')\
     )";
     let plan = test_sql(sql)?;
-    let expected =
-        "Projection: test.col_int32\n  Filter: CAST(test.col_int32 AS Float64) > __sq_1.__value\
-        \n    CrossJoin:\
-        \n      TableScan: test projection=[col_int32]\
-        \n      Projection: AVG(test.col_int32) AS __value, alias=__sq_1\
-        \n        Aggregate: groupBy=[[]], aggr=[[AVG(test.col_int32)]]\
-        \n          Filter: test.col_utf8 >= Utf8(\"2002-05-08\") AND test.col_utf8 <= Utf8(\"2002-05-13\")\
-        \n            TableScan: test projection=[col_int32, col_utf8]";
+    let expected = "Projection: test.col_int32\
+    \n  Filter: CAST(test.col_int32 AS Float64) > __sq_1.__value\
+    \n    CrossJoin:\
+    \n      TableScan: test projection=[col_int32]\
+    \n      SubqueryAlias: __sq_1\
+    \n        Projection: AVG(test.col_int32) AS __value\
+    \n          Aggregate: groupBy=[[]], aggr=[[AVG(test.col_int32)]]\
+    \n            Filter: test.col_utf8 >= Utf8(\"2002-05-08\") AND test.col_utf8 <= Utf8(\"2002-05-13\")\
+    \n              TableScan: test projection=[col_int32, col_utf8]";
     assert_eq!(expected, format!("{:?}", plan));
     Ok(())
 }
@@ -271,14 +272,14 @@ fn propagate_empty_relation() {
 fn join_keys_in_subquery_alias() {
     let sql = "SELECT * FROM test AS A, ( SELECT col_int32 as key FROM test ) AS B where A.col_int32 = B.key;";
     let plan = test_sql(sql).unwrap();
-    let expected =  "Projection: a.col_int32, a.col_uint32, a.col_utf8, a.col_date32, a.col_date64, a.col_ts_nano_none, a.col_ts_nano_utc, b.key\
+    let expected = "Projection: a.col_int32, a.col_uint32, a.col_utf8, a.col_date32, a.col_date64, a.col_ts_nano_none, a.col_ts_nano_utc, b.key\
     \n  Inner Join: a.col_int32 = b.key\
     \n    Filter: a.col_int32 IS NOT NULL\
     \n      SubqueryAlias: a\
     \n        TableScan: test projection=[col_int32, col_uint32, col_utf8, col_date32, col_date64, col_ts_nano_none, col_ts_nano_utc]\
-    \n    Projection: key, alias=b\
-    \n      Projection: test.col_int32 AS key\
-    \n        Filter: test.col_int32 IS NOT NULL\
+    \n    Filter: b.key IS NOT NULL\
+    \n      SubqueryAlias: b\
+    \n        Projection: test.col_int32 AS key\
     \n          TableScan: test projection=[col_int32]";
     assert_eq!(expected, format!("{:?}", plan));
 }
@@ -292,14 +293,15 @@ fn join_keys_in_subquery_alias_1() {
     \n    Filter: a.col_int32 IS NOT NULL\
     \n      SubqueryAlias: a\
     \n        TableScan: test projection=[col_int32, col_uint32, col_utf8, col_date32, col_date64, col_ts_nano_none, col_ts_nano_utc]\
-    \n    Projection: key, alias=b\
-    \n      Projection: test.col_int32 AS key\
-    \n        Inner Join: test.col_int32 = c.col_int32\
-    \n          Filter: test.col_int32 IS NOT NULL\
-    \n            TableScan: test projection=[col_int32]\
-    \n          Filter: c.col_int32 IS NOT NULL\
-    \n            SubqueryAlias: c\
-    \n              TableScan: test projection=[col_int32]";
+    \n    Filter: b.key IS NOT NULL\
+    \n      SubqueryAlias: b\
+    \n        Projection: test.col_int32 AS key\
+    \n          Inner Join: test.col_int32 = c.col_int32\
+    \n            Filter: test.col_int32 IS NOT NULL\
+    \n              TableScan: test projection=[col_int32]\
+    \n            Filter: c.col_int32 IS NOT NULL\
+    \n              SubqueryAlias: c\
+    \n                TableScan: test projection=[col_int32]";
     assert_eq!(expected, format!("{:?}", plan));
 }
 
diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs
index 42d3401ef..c63258858 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -969,28 +969,25 @@ impl AsLogicalPlan for LogicalPlanNode {
                     Ok(node)
                 }
             }
-            LogicalPlan::Projection(Projection {
-                expr, input, alias, ..
-            }) => Ok(protobuf::LogicalPlanNode {
-                logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
-                    protobuf::ProjectionNode {
-                        input: Some(Box::new(
-                            protobuf::LogicalPlanNode::try_from_logical_plan(
-                                input.as_ref(),
-                                extension_codec,
-                            )?,
-                        )),
-                        expr: expr.iter().map(|expr| expr.try_into()).collect::<Result<
-                            Vec<_>,
-                            to_proto::Error,
-                        >>(
-                        )?,
-                        optional_alias: alias
-                            .clone()
-                            .map(protobuf::projection_node::OptionalAlias::Alias),
-                    },
-                ))),
-            }),
+            LogicalPlan::Projection(Projection { expr, input, .. }) => {
+                Ok(protobuf::LogicalPlanNode {
+                    logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
+                        protobuf::ProjectionNode {
+                            input: Some(Box::new(
+                                protobuf::LogicalPlanNode::try_from_logical_plan(
+                                    input.as_ref(),
+                                    extension_codec,
+                                )?,
+                            )),
+                            expr: expr
+                                .iter()
+                                .map(|expr| expr.try_into())
+                                .collect::<Result<Vec<_>, to_proto::Error>>()?,
+                            optional_alias: None,
+                        },
+                    ))),
+                })
+            }
             LogicalPlan::Filter(filter) => {
                 let input: protobuf::LogicalPlanNode =
                     protobuf::LogicalPlanNode::try_from_logical_plan(
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index dda847424..36b45390e 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -45,7 +45,7 @@ use datafusion_common::{
 use datafusion_expr::expr::{Between, BinaryExpr, Case, Cast, GroupingSet, Like};
 use datafusion_expr::expr_rewriter::normalize_col;
 use datafusion_expr::expr_rewriter::normalize_col_with_schemas;
-use datafusion_expr::logical_plan::builder::project_with_alias;
+use datafusion_expr::logical_plan::builder::{project_with_alias, with_alias};
 use datafusion_expr::logical_plan::Join as HashJoin;
 use datafusion_expr::logical_plan::JoinConstraint as HashJoinConstraint;
 use datafusion_expr::logical_plan::{
@@ -857,11 +857,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 (
                     match (cte, self.schema_provider.get_table_provider(table_ref)) {
                         (Some(cte_plan), _) => match table_alias {
-                            Some(cte_alias) => project_with_alias(
-                                cte_plan.clone(),
-                                vec![Expr::Wildcard],
-                                Some(cte_alias),
-                            ),
+                            Some(cte_alias) => {
+                                Ok(with_alias(cte_plan.clone(), cte_alias))
+                            }
                             _ => Ok(cte_plan.clone()),
                         },
                         (_, Ok(provider)) => {
@@ -888,18 +886,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                     ctes,
                     outer_query_schema,
                 )?;
-                (
-                    project_with_alias(
-                        logical_plan.clone(),
-                        logical_plan
-                            .schema()
-                            .fields()
-                            .iter()
-                            .map(|field| col(field.name())),
-                        normalized_alias,
-                    )?,
-                    alias,
-                )
+
+                let plan = match normalized_alias {
+                    Some(alias) => with_alias(logical_plan, alias),
+                    _ => logical_plan,
+                };
+                (plan, alias)
             }
             TableFactor::NestedJoin {
                 table_with_joins,
@@ -3209,9 +3201,8 @@ mod tests {
         quick_test(
             "SELECT CAST (a AS FLOAT) FROM (SELECT 1 AS a)",
             "Projection: CAST(a AS Float32)\
-             \n  Projection: a\
-             \n    Projection: Int64(1) AS a\
-             \n      EmptyRelation",
+            \n  Projection: Int64(1) AS a\
+            \n    EmptyRelation",
         );
     }
 
@@ -3411,11 +3402,11 @@ mod tests {
                      ) AS a
                    ) AS b";
         let expected = "Projection: b.fn2, b.last_name\
-                        \n  Projection: fn2, a.last_name, a.birth_date, alias=b\
-                        \n    Projection: a.fn1 AS fn2, a.last_name, a.birth_date\
-                        \n      Projection: fn1, person.last_name, person.birth_date, person.age, alias=a\
-                        \n        Projection: person.first_name AS fn1, person.last_name, person.birth_date, person.age\
-                        \n          TableScan: person";
+        \n  SubqueryAlias: b\
+        \n    Projection: a.fn1 AS fn2, a.last_name, a.birth_date\
+        \n      SubqueryAlias: a\
+        \n        Projection: person.first_name AS fn1, person.last_name, person.birth_date, person.age\
+        \n          TableScan: person";
         quick_test(sql, expected);
     }
 
@@ -3430,11 +3421,11 @@ mod tests {
                    WHERE fn1 = 'X' AND age < 30";
 
         let expected = "Projection: a.fn1, a.age\
-                        \n  Filter: a.fn1 = Utf8(\"X\") AND a.age < Int64(30)\
-                        \n    Projection: fn1, person.age, alias=a\
-                        \n      Projection: person.first_name AS fn1, person.age\
-                        \n        Filter: person.age > Int64(20)\
-                        \n          TableScan: person";
+        \n  Filter: a.fn1 = Utf8(\"X\") AND a.age < Int64(30)\
+        \n    SubqueryAlias: a\
+        \n      Projection: person.first_name AS fn1, person.age\
+        \n        Filter: person.age > Int64(20)\
+        \n          TableScan: person";
 
         quick_test(sql, expected);
     }
@@ -3444,9 +3435,10 @@ mod tests {
         let sql = "SELECT a, b, c
                    FROM lineitem l (a, b, c)";
         let expected = "Projection: l.a, l.b, l.c\
-                        \n  Projection: l.l_item_id AS a, l.l_description AS b, l.price AS c, alias=l\
-                        \n    SubqueryAlias: l\
-                        \n      TableScan: lineitem";
+        \n  SubqueryAlias: l\
+        \n    Projection: l.l_item_id AS a, l.l_description AS b, l.price AS c\
+        \n      SubqueryAlias: l\
+        \n        TableScan: lineitem";
         quick_test(sql, expected);
     }
 
@@ -3804,10 +3796,10 @@ mod tests {
         quick_test(
             "SELECT * FROM (SELECT first_name, last_name FROM person) AS a GROUP BY first_name, last_name",
             "Projection: a.first_name, a.last_name\
-             \n  Aggregate: groupBy=[[a.first_name, a.last_name]], aggr=[[]]\
-             \n    Projection: person.first_name, person.last_name, alias=a\
-             \n      Projection: person.first_name, person.last_name\
-             \n        TableScan: person",
+            \n  Aggregate: groupBy=[[a.first_name, a.last_name]], aggr=[[]]\
+            \n    SubqueryAlias: a\
+            \n      Projection: person.first_name, person.last_name\
+            \n        TableScan: person",
         );
     }
 
@@ -3873,9 +3865,10 @@ mod tests {
         quick_test(
             "SELECT col1, col2 FROM (VALUES (TIMESTAMP '2021-06-10 17:01:00Z', DATE '2004-04-09')) as t (col1, col2)",
             "Projection: t.col1, t.col2\
-            \n  Projection: t.column1 AS col1, t.column2 AS col2, alias=t\
-            \n    Projection: column1, column2, alias=t\
-            \n      Values: (CAST(Utf8(\"2021-06-10 17:01:00Z\") AS Timestamp(Nanosecond, None)), CAST(Utf8(\"2004-04-09\") AS Date32))",
+            \n  SubqueryAlias: t\
+            \n    Projection: t.column1 AS col1, t.column2 AS col2\
+            \n      SubqueryAlias: t\
+            \n        Values: (CAST(Utf8(\"2021-06-10 17:01:00Z\") AS Timestamp(Nanosecond, None)), CAST(Utf8(\"2004-04-09\") AS Date32))",
         );
     }
 
@@ -4711,17 +4704,17 @@ mod tests {
     fn sorted_union_with_different_types_and_group_by() {
         let sql = "SELECT a FROM (select 1 a) x GROUP BY 1 UNION ALL (SELECT a FROM (select 1.1 a) x GROUP BY 1) ORDER BY 1";
         let expected = "Sort: a ASC NULLS LAST\
-            \n  Union\
-            \n    Projection: CAST(x.a AS Float64) AS a\
-            \n      Aggregate: groupBy=[[x.a]], aggr=[[]]\
-            \n        Projection: a, alias=x\
-            \n          Projection: Int64(1) AS a\
-            \n            EmptyRelation\
-            \n    Projection: x.a\
-            \n      Aggregate: groupBy=[[x.a]], aggr=[[]]\
-            \n        Projection: a, alias=x\
-            \n          Projection: Float64(1.1) AS a\
-            \n            EmptyRelation";
+        \n  Union\
+        \n    Projection: CAST(x.a AS Float64) AS a\
+        \n      Aggregate: groupBy=[[x.a]], aggr=[[]]\
+        \n        SubqueryAlias: x\
+        \n          Projection: Int64(1) AS a\
+        \n            EmptyRelation\
+        \n    Projection: x.a\
+        \n      Aggregate: groupBy=[[x.a]], aggr=[[]]\
+        \n        SubqueryAlias: x\
+        \n          Projection: Float64(1.1) AS a\
+        \n            EmptyRelation";
         quick_test(sql, expected);
     }
 
@@ -4729,16 +4722,16 @@ mod tests {
     fn union_with_binary_expr_and_cast() {
         let sql = "SELECT cast(0.0 + a as integer) FROM (select 1 a) x GROUP BY 1 UNION ALL (SELECT 2.1 + a FROM (select 1 a) x GROUP BY 1)";
         let expected = "Union\
-            \n  Projection: CAST(Float64(0) + x.a AS Float64) AS Float64(0) + x.a\
-            \n    Aggregate: groupBy=[[CAST(Float64(0) + x.a AS Int32)]], aggr=[[]]\
-            \n      Projection: a, alias=x\
-            \n        Projection: Int64(1) AS a\
-            \n          EmptyRelation\
-            \n  Projection: Float64(2.1) + x.a\
-            \n    Aggregate: groupBy=[[Float64(2.1) + x.a]], aggr=[[]]\
-            \n      Projection: a, alias=x\
-            \n        Projection: Int64(1) AS a\
-            \n          EmptyRelation";
+        \n  Projection: CAST(Float64(0) + x.a AS Float64) AS Float64(0) + x.a\
+        \n    Aggregate: groupBy=[[CAST(Float64(0) + x.a AS Int32)]], aggr=[[]]\
+        \n      SubqueryAlias: x\
+        \n        Projection: Int64(1) AS a\
+        \n          EmptyRelation\
+        \n  Projection: Float64(2.1) + x.a\
+        \n    Aggregate: groupBy=[[Float64(2.1) + x.a]], aggr=[[]]\
+        \n      SubqueryAlias: x\
+        \n        Projection: Int64(1) AS a\
+        \n          EmptyRelation";
         quick_test(sql, expected);
     }
 
@@ -4746,16 +4739,16 @@ mod tests {
     fn union_with_aliases() {
         let sql = "SELECT a as a1 FROM (select 1 a) x GROUP BY 1 UNION ALL (SELECT a as a1 FROM (select 1.1 a) x GROUP BY 1)";
         let expected = "Union\
-            \n  Projection: CAST(x.a AS Float64) AS a1\
-            \n    Aggregate: groupBy=[[x.a]], aggr=[[]]\
-            \n      Projection: a, alias=x\
-            \n        Projection: Int64(1) AS a\
-            \n          EmptyRelation\
-            \n  Projection: x.a AS a1\
-            \n    Aggregate: groupBy=[[x.a]], aggr=[[]]\
-            \n      Projection: a, alias=x\
-            \n        Projection: Float64(1.1) AS a\
-            \n          EmptyRelation";
+        \n  Projection: CAST(x.a AS Float64) AS a1\
+        \n    Aggregate: groupBy=[[x.a]], aggr=[[]]\
+        \n      SubqueryAlias: x\
+        \n        Projection: Int64(1) AS a\
+        \n          EmptyRelation\
+        \n  Projection: x.a AS a1\
+        \n    Aggregate: groupBy=[[x.a]], aggr=[[]]\
+        \n      SubqueryAlias: x\
+        \n        Projection: Float64(1.1) AS a\
+        \n          EmptyRelation";
         quick_test(sql, expected);
     }
 
@@ -5503,8 +5496,9 @@ mod tests {
         \n    Subquery:\
         \n      Projection: cte.id, cte.first_name, cte.last_name, cte.age, cte.state, cte.salary, cte.birth_date, cte.😀\
         \n        Filter: cte.id = person.id\
-        \n          Projection: person.id, person.first_name, person.last_name, person.age, person.state, person.salary, person.birth_date, person.😀, alias=cte\
-        \n            TableScan: person\
+        \n          SubqueryAlias: cte\
+        \n            Projection: person.id, person.first_name, person.last_name, person.age, person.state, person.salary, person.birth_date, person.😀\
+        \n              TableScan: person\
         \n    TableScan: person";
 
         quick_test(sql, expected)
@@ -5519,8 +5513,9 @@ mod tests {
         SELECT * FROM numbers;";
 
         let expected = "Projection: numbers.a, numbers.b, numbers.c\
-        \n  Projection: Int64(1) AS a, Int64(2) AS b, Int64(3) AS c, alias=numbers\
-        \n    EmptyRelation";
+        \n  SubqueryAlias: numbers\
+        \n    Projection: Int64(1) AS a, Int64(2) AS b, Int64(3) AS c\
+        \n      EmptyRelation";
 
         quick_test(sql, expected)
     }
@@ -5534,9 +5529,11 @@ mod tests {
         SELECT * FROM numbers;";
 
         let expected = "Projection: numbers.a, numbers.b, numbers.c\
-        \n  Projection: numbers.Int64(1) AS a, numbers.Int64(2) AS b, numbers.Int64(3) AS c, alias=numbers\
-        \n    Projection: Int64(1), Int64(2), Int64(3), alias=numbers\
-        \n      EmptyRelation";
+        \n  SubqueryAlias: numbers\
+        \n    Projection: numbers.Int64(1) AS a, numbers.Int64(2) AS b, numbers.Int64(3) AS c\
+        \n      SubqueryAlias: numbers\
+        \n        Projection: Int64(1), Int64(2), Int64(3)\
+        \n          EmptyRelation";
 
         quick_test(sql, expected)
     }
@@ -5551,9 +5548,11 @@ mod tests {
         SELECT * FROM numbers;";
 
         let expected = "Projection: numbers.a, numbers.b, numbers.c\
-        \n  Projection: numbers.x AS a, numbers.y AS b, numbers.z AS c, alias=numbers\
-        \n    Projection: Int64(1) AS x, Int64(2) AS y, Int64(3) AS z, alias=numbers\
-        \n      EmptyRelation";
+        \n  SubqueryAlias: numbers\
+        \n    Projection: numbers.x AS a, numbers.y AS b, numbers.z AS c\
+        \n      SubqueryAlias: numbers\
+        \n        Projection: Int64(1) AS x, Int64(2) AS y, Int64(3) AS z\
+        \n          EmptyRelation";
 
         quick_test(sql, expected)
     }