You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ja...@apache.org on 2023/04/23 10:32:44 UTC

[arrow-datafusion] branch main updated: fix: incorrect column pruning in sql with window operations (#6039)

This is an automated email from the ASF dual-hosted git repository.

jakevin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 72d7db299f fix: incorrect column pruning in sql with window operations (#6039)
72d7db299f is described below

commit 72d7db299f8f4e8c6f58b52c9ecf0b2021562deb
Author: yujie.zhang <yu...@cnosdb.com>
AuthorDate: Sun Apr 23 18:32:38 2023 +0800

    fix: incorrect column pruning in sql with window operations (#6039)
---
 datafusion/core/tests/sql/window.rs                |  16 +-
 .../core/tests/sqllogictests/test_files/window.slt | 275 +++++++++++----------
 datafusion/optimizer/src/push_down_projection.rs   |  76 +++++-
 3 files changed, 229 insertions(+), 138 deletions(-)

diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs
index ea3a8002a7..5d0fb37991 100644
--- a/datafusion/core/tests/sql/window.rs
+++ b/datafusion/core/tests/sql/window.rs
@@ -109,11 +109,11 @@ mod tests {
                 "ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, sum3@2 as sum3, min1@3 as min1, min2@4 as min2, min3@5 as min3, max1@6 as max1, max2@7 as max2, max3@8 as max3, cnt1@9 as cnt1, cnt2@10 as cnt2, sumr1@11 as sumr1, sumr2@12 as sumr2, sumr3@13 as sumr3, minr1@14 as minr1, minr2@15 as minr2, minr3@16 as minr3, maxr1@17 as maxr1, maxr2@18 as maxr2, maxr3@19 as maxr3, cntr1@20 as cntr1, cntr2@21 as cntr2, sum4@22 as sum4, cnt3@23 as cnt3]",
                 "  GlobalLimitExec: skip=0, fetch=5",
                 "    SortExec: fetch=5, expr=[inc_col@24 DESC]",
-                "      ProjectionExec: expr=[SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@14 as sum1, SUM(annotated_data.desc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@15 as sum2, SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING@16 as sum3, MIN(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS L [...]
+                "      ProjectionExec: expr=[SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@13 as sum1, SUM(annotated_data.desc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@14 as sum2, SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING@15 as sum3, MIN(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS L [...]
                 "        BoundedWindowAggExec: wdw=[SUM(annotated_data.desc_col): Ok(Field { name: \"SUM(annotated_data.desc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(8)), end_bound: Following(UInt64(1)) }, COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units [...]
-                "          BoundedWindowAggExec: wdw=[SUM(annotated_data.inc_col): Ok(Field { name: \"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, SUM(annotated_data.desc_col): Ok(Field { name: \"SUM(annotated_data.desc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }),  [...]
-                "            BoundedWindowAggExec: wdw=[SUM(annotated_data.inc_col): Ok(Field { name: \"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(4)), end_bound: Following(Int32(1)) }, SUM(annotated_data.desc_col): Ok(Field { name: \"SUM(annotated_data.desc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), [...]
-            ]
+                "          ProjectionExec: expr=[inc_col@1 as inc_col, desc_col@2 as desc_col, SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING@3 as SUM(annotated_data.inc_col), SUM(annotated_data.desc_col) ORDER BY [annotated_data.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING@4 as SUM(annotated_data.desc_col), SUM(annotated_data.desc_col) ORDER BY [annotated_data.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECE [...]
+                "            BoundedWindowAggExec: wdw=[SUM(annotated_data.inc_col): Ok(Field { name: \"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, SUM(annotated_data.desc_col): Ok(Field { name: \"SUM(annotated_data.desc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) [...]
+                "              BoundedWindowAggExec: wdw=[SUM(annotated_data.inc_col): Ok(Field { name: \"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(4)), end_bound: Following(Int32(1)) }, SUM(annotated_data.desc_col): Ok(Field { name: \"SUM(annotated_data.desc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } [...]
         };
 
         let actual: Vec<&str> = formatted.trim().lines().collect();
@@ -185,7 +185,7 @@ mod tests {
                 "ProjectionExec: expr=[fv1@0 as fv1, fv2@1 as fv2, lv1@2 as lv1, lv2@3 as lv2, nv1@4 as nv1, nv2@5 as nv2, rn1@6 as rn1, rn2@7 as rn2, rank1@8 as rank1, rank2@9 as rank2, dense_rank1@10 as dense_rank1, dense_rank2@11 as dense_rank2, lag1@12 as lag1, lag2@13 as lag2, lead1@14 as lead1, lead2@15 as lead2, fvr1@16 as fvr1, fvr2@17 as fvr2, lvr1@18 as lvr1, lvr2@19 as lvr2, lagr1@20 as lagr1, lagr2@21 as lagr2, leadr1@22 as leadr1, leadr2@23 as leadr2]",
                 "  GlobalLimitExec: skip=0, fetch=5",
                 "    SortExec: fetch=5, expr=[ts@24 DESC]",
-                "      ProjectionExec: expr=[FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@11 as fv1, FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@12 as fv2, LAST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@13 as lv1, LAST_VALUE(annotated_data.inc_col) ORDER BY [an [...]
+                "      ProjectionExec: expr=[FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@10 as fv1, FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@11 as fv2, LAST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@12 as lv1, LAST_VALUE(annotated_data.inc_col) ORDER BY [an [...]
                 "        BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data.inc_col): Ok(Field { name: \"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, FIRST_VALUE(annotated_data.inc_col): Ok(Field { name: \"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_order [...]
                 "          BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data.inc_col): Ok(Field { name: \"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) }, FIRST_VALUE(annotated_data.inc_col): Ok(Field { name: \"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ord [...]
             ]
@@ -245,7 +245,7 @@ mod tests {
                 "ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, min1@2 as min1, min2@3 as min2, max1@4 as max1, max2@5 as max2, count1@6 as count1, count2@7 as count2, avg1@8 as avg1, avg2@9 as avg2]",
                 "  GlobalLimitExec: skip=0, fetch=5",
                 "    SortExec: fetch=5, expr=[inc_col@10 ASC NULLS LAST]",
-                "      ProjectionExec: expr=[SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING@8 as sum1, SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@3 as sum2, MIN(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING@9 as min1, MIN(annotated_data.inc_col) ORDER BY [annotate [...]
+                "      ProjectionExec: expr=[SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING@7 as sum1, SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as sum2, MIN(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING@8 as min1, MIN(annotated_data.inc_col) ORDER BY [annotate [...]
                 "        BoundedWindowAggExec: wdw=[SUM(annotated_data.inc_col): Ok(Field { name: \"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(5)) }, MIN(annotated_data.inc_col): Ok(Field { name: \"MIN(annotated_data.inc_col)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), fr [...]
                 "          BoundedWindowAggExec: wdw=[SUM(annotated_data.inc_col): Ok(Field { name: \"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(3)) }, MIN(annotated_data.inc_col): Ok(Field { name: \"MIN(annotated_data.inc_col)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }),  [...]
             ]
@@ -300,7 +300,7 @@ mod tests {
                 "ProjectionExec: expr=[first_value1@0 as first_value1, first_value2@1 as first_value2, last_value1@2 as last_value1, last_value2@3 as last_value2, nth_value1@4 as nth_value1]",
                 "  GlobalLimitExec: skip=0, fetch=5",
                 "    SortExec: fetch=5, expr=[inc_col@5 ASC NULLS LAST]",
-                "      ProjectionExec: expr=[FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@5 as first_value1, FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@3 as first_value2, LAST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@6 as last_value1, LAS [...]
+                "      ProjectionExec: expr=[FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@4 as first_value1, FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as first_value2, LAST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@5 as last_value1, LAS [...]
                 "        BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data.inc_col): Ok(Field { name: \"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(1)) }, LAST_VALUE(annotated_data.inc_col): Ok(Field { name: \"LAST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_orde [...]
                 "          BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data.inc_col): Ok(Field { name: \"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(3)) }, LAST_VALUE(annotated_data.inc_col): Ok(Field { name: \"LAST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_or [...]
             ]
@@ -354,7 +354,7 @@ mod tests {
             vec![
                 "ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, count2@3 as count2]",
                 "  GlobalLimitExec: skip=0, fetch=5",
-                "    ProjectionExec: expr=[SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@5 as sum1, SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@3 as sum2, COUNT(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@6 as count1, COUNT(annotated_data.inc_col) ORDER BY [annotat [...]
+                "    ProjectionExec: expr=[SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@4 as sum1, SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as sum2, COUNT(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@5 as count1, COUNT(annotated_data.inc_col) ORDER BY [annotat [...]
                 "      BoundedWindowAggExec: wdw=[SUM(annotated_data.inc_col): Ok(Field { name: \"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(1)) }, COUNT(annotated_data.inc_col): Ok(Field { name: \"COUNT(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), [...]
                 "        BoundedWindowAggExec: wdw=[SUM(annotated_data.inc_col): Ok(Field { name: \"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(3)) }, COUNT(annotated_data.inc_col): Ok(Field { name: \"COUNT(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } [...]
             ]
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt b/datafusion/core/tests/sqllogictests/test_files/window.slt
index 0170eed929..04b1877e10 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -1209,14 +1209,16 @@ EXPLAIN SELECT
 logical_plan
 Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum2
   WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-    WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-      TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+    Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+      WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+        TableScan: aggregate_test_100 projection=[c8, c9]
 physical_plan
-ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@14 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@13 as sum2]
+ProjectionExec: expr=[c9@0 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as sum2]
   BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }]
-    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }]
-      SortExec: expr=[c9@8 ASC NULLS LAST,c8@7 ASC NULLS LAST]
-        CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+    ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as SUM(aggregate_test_100.c9)]
+      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }]
+        SortExec: expr=[c9@1 ASC NULLS LAST,c8@0 ASC NULLS LAST]
+          CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c8, c9]
 
 # over_order_by_sort_keys_sorting_prefix_compacting
 
@@ -1229,14 +1231,14 @@ Projection: aggregate_test_100.c2, MAX(aggregate_test_100.c9) ORDER BY [aggregat
   WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
     WindowAggr: windowExpr=[[MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
       WindowAggr: windowExpr=[[MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-        TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+        TableScan: aggregate_test_100 projection=[c2, c9]
 physical_plan
-ProjectionExec: expr=[c2@1 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@14 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@15 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@13 as MIN(aggregat [...]
+ProjectionExec: expr=[c2@0 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@4 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as MIN(aggregate_t [...]
   WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]
     BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: "MAX(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }]
       BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: "MIN(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }]
-        SortExec: expr=[c2@1 ASC NULLS LAST,c9@8 ASC NULLS LAST]
-          CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+        SortExec: expr=[c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]
+          CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c2, c9]
 
 # FIXME: for now we are not detecting prefix of sorting keys in order to re-arrange with global and save one SortExec
 # over_order_by_sort_keys_sorting_global_order_compacting
@@ -1251,16 +1253,16 @@ Sort: aggregate_test_100.c2 ASC NULLS LAST
     WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
       WindowAggr: windowExpr=[[MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
         WindowAggr: windowExpr=[[MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-          TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+          TableScan: aggregate_test_100 projection=[c2, c9]
 physical_plan
 SortExec: expr=[c2@0 ASC NULLS LAST]
-  ProjectionExec: expr=[c2@1 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@14 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@15 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PREC [...]
+  ProjectionExec: expr=[c2@0 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@4 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECED [...]
     WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]
       BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: "MAX(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }]
-        SortExec: expr=[c9@8 ASC NULLS LAST,c2@1 ASC NULLS LAST]
+        SortExec: expr=[c9@1 ASC NULLS LAST,c2@0 ASC NULLS LAST]
           BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: "MIN(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }]
-            SortExec: expr=[c2@1 ASC NULLS LAST,c9@8 ASC NULLS LAST]
-              CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+            SortExec: expr=[c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]
+              CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c2, c9]
 
 # test_window_partition_by_order_by
 statement ok
@@ -1275,20 +1277,22 @@ EXPLAIN SELECT
 logical_plan
 Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
   WindowAggr: windowExpr=[[COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
-    WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
-      TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+    Projection: aggregate_test_100.c1, aggregate_test_100.c2, SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
+      WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+        TableScan: aggregate_test_100 projection=[c1, c2, c4]
 physical_plan
-ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@13 as SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@14 as COUNT(UInt8(1))]
+ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@2 as SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as COUNT(UInt8(1))]
   BoundedWindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name: "COUNT(UInt8(1))", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]
     SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]
       CoalesceBatchesExec: target_batch_size=4096
         RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 2), input_partitions=2
-          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: "SUM(aggregate_test_100.c4)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]
-            SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]
-              CoalesceBatchesExec: target_batch_size=4096
-                RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }, Column { name: "c2", index: 1 }], 2), input_partitions=2
-                  RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-                    CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+          ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4)]
+            BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: "SUM(aggregate_test_100.c4)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]
+              SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]
+                CoalesceBatchesExec: target_batch_size=4096
+                  RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }, Column { name: "c2", index: 1 }], 2), input_partitions=2
+                    RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+                      CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c4]
 
 # test_window_agg_sort_reversed_plan
 # Only 1 SortExec was added
@@ -1305,14 +1309,14 @@ Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY [aggregat
   Limit: skip=0, fetch=5
     WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
       WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
-        TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+        TableScan: aggregate_test_100 projection=[c9]
 physical_plan
-ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as sum2]
+ProjectionExec: expr=[c9@0 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum2]
   GlobalLimitExec: skip=0, fetch=5
     BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]
       BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
-        SortExec: expr=[c9@8 DESC]
-          CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+        SortExec: expr=[c9@0 DESC]
+          CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9]
 
 query III
 SELECT
@@ -1346,14 +1350,14 @@ Projection: aggregate_test_100.c9, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [
   Limit: skip=0, fetch=5
     WindowAggr: windowExpr=[[FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, LAG(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, LEAD(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
       WindowAggr: windowExpr=[[FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, LAG(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, LEAD(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING]]
-        TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+        TableScan: aggregate_test_100 projection=[c9]
 physical_plan
-ProjectionExec: expr=[c9@8 as c9, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@16 as fv1, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as fv2, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@17 as lag1, LAG(aggregate_test_100.c9,I [...]
+ProjectionExec: expr=[c9@0 as c9, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@4 as fv1, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as fv2, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as lag1, LAG(aggregate_test_100.c9,Int6 [...]
   GlobalLimitExec: skip=0, fetch=5
     BoundedWindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: "FIRST_VALUE(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: "LAG(aggregate_test_100.c9,Int64(2),Int64(10101))", data_type: UInt64, nullable: true, dict_id: 0, dict_is_orde [...]
       BoundedWindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: "FIRST_VALUE(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: "LAG(aggregate_test_100.c9,Int64(2),Int64(10101))", data_type: UInt64, nullable: true, dict_id: 0, dict_is_or [...]
-        SortExec: expr=[c9@8 DESC]
-          CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+        SortExec: expr=[c9@0 DESC]
+          CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9]
 
 query IIIIIII
 SELECT
@@ -1389,15 +1393,15 @@ Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9
   Limit: skip=0, fetch=5
     WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
       WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
-        TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+        TableScan: aggregate_test_100 projection=[c9]
 physical_plan
-ProjectionExec: expr=[c9@8 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as rn1, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as rn2]
+ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as rn1, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as rn2]
   GlobalLimitExec: skip=0, fetch=5
     BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
-      SortExec: expr=[c9@8 ASC NULLS LAST]
+      SortExec: expr=[c9@0 ASC NULLS LAST]
         BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
-          SortExec: expr=[c9@8 DESC]
-            CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+          SortExec: expr=[c9@0 DESC]
+            CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9]
 
 
 query III
@@ -1431,16 +1435,16 @@ Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY [aggregat
     WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
       WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
         WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
-          TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+          TableScan: aggregate_test_100 projection=[c1, c2, c9]
 physical_plan
-ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@15 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as sum2, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECED [...]
+ProjectionExec: expr=[c9@2 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@5 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDIN [...]
   GlobalLimitExec: skip=0, fetch=5
     BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
-      SortExec: expr=[c9@8 ASC NULLS LAST,c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]
+      SortExec: expr=[c9@2 ASC NULLS LAST,c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]
         BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
           BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
-            SortExec: expr=[c9@8 DESC,c1@0 DESC]
-              CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+            SortExec: expr=[c9@2 DESC,c1@0 DESC]
+              CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c9]
 
 query IIII
 SELECT
@@ -1505,31 +1509,33 @@ logical_plan
 Projection: SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING AS a, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING AS b, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING AS c, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING AS d, SUM(null_cases.c1) ORDER BY [null_cases.c [...]
   Limit: skip=0, fetch=5
     WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING]]
-      WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST, null_cases.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-        WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST, null_cases.c1 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-          WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST, null_cases.c1 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-            WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING]]
-              WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING]]
-                WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING]]
-                  WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST, null_cases.c1 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-                    TableScan: null_cases projection=[c1, c2, c3]
+      Projection: null_cases.c1, null_cases.c3, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST, null_cases.c1 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED [...]
+        WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST, null_cases.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+          WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST, null_cases.c1 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+            WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST, null_cases.c1 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+              WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING]]
+                WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING]]
+                  WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING]]
+                    WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST, null_cases.c1 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+                      TableScan: null_cases projection=[c1, c2, c3]
 physical_plan
-ProjectionExec: expr=[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@19 as a, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@19 as b, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@4 as c, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@12 as d, SUM(null_cases.c1) O [...]
+ProjectionExec: expr=[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@18 as a, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@18 as b, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@3 as c, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@11 as d, SUM(null_cases.c1) O [...]
   GlobalLimitExec: skip=0, fetch=5
     WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), [...]
-      BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]
-        SortExec: expr=[c3@2 ASC NULLS LAST,c2@1 ASC NULLS LAST]
-          BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]
-            SortExec: expr=[c3@2 ASC NULLS LAST,c1@0 ASC]
-              BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]
-                SortExec: expr=[c3@2 ASC NULLS LAST,c1@0 DESC]
-                  WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(11)), end_bound: Following(Int64(10)) }, SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRo [...]
-                    WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Precedi [...]
-                      SortExec: expr=[c3@2 DESC NULLS LAST]
-                        WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Pre [...]
-                          BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]
-                            SortExec: expr=[c3@2 DESC,c1@0 ASC NULLS LAST]
-                              CsvExec: files={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/null_cases.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3]
+      ProjectionExec: expr=[c1@0 as c1, c3@2 as c3, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST, null_cases.c1 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as SUM(null_cases.c1), SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@4 as SUM(null_cases.c1), SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as SUM(null_cases.c1), SUM(null [...]
+        BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]
+          SortExec: expr=[c3@2 ASC NULLS LAST,c2@1 ASC NULLS LAST]
+            BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]
+              SortExec: expr=[c3@2 ASC NULLS LAST,c1@0 ASC]
+                BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]
+                  SortExec: expr=[c3@2 ASC NULLS LAST,c1@0 DESC]
+                    WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(11)), end_bound: Following(Int64(10)) }, SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Current [...]
+                      WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Prece [...]
+                        SortExec: expr=[c3@2 DESC NULLS LAST]
+                          WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: P [...]
+                            BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]
+                              SortExec: expr=[c3@2 DESC,c1@0 ASC NULLS LAST]
+                                CsvExec: files={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/null_cases.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3]
 
 query IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII
 SELECT
@@ -1597,14 +1603,14 @@ Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY [aggregat
   Limit: skip=0, fetch=5
     WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
       WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
-        TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+        TableScan: aggregate_test_100 projection=[c1, c9]
 physical_plan
-ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as sum2]
+ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]
   GlobalLimitExec: skip=0, fetch=5
     BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
       BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
-        SortExec: expr=[c1@0 ASC NULLS LAST,c9@8 DESC]
-          CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+        SortExec: expr=[c1@0 ASC NULLS LAST,c9@1 DESC]
+          CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c9]
 
 
 query III
@@ -1641,14 +1647,14 @@ Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) PARTITION BY [aggr
   Limit: skip=0, fetch=5
     WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
       WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
-        TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+        TableScan: aggregate_test_100 projection=[c1, c9]
 physical_plan
-ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as sum2]
+ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum2]
   GlobalLimitExec: skip=0, fetch=5
     BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]
       BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
-        SortExec: expr=[c1@0 ASC NULLS LAST,c9@8 DESC]
-          CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+        SortExec: expr=[c1@0 ASC NULLS LAST,c9@1 DESC]
+          CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c9]
 
 query III
 SELECT
@@ -1684,15 +1690,17 @@ logical_plan
 Projection: aggregate_test_100.c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum2
   Limit: skip=0, fetch=5
     WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-      WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-        TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+      Projection: aggregate_test_100.c3, aggregate_test_100.c4, aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+        WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+          TableScan: aggregate_test_100 projection=[c2, c3, c4, c9]
 physical_plan
-ProjectionExec: expr=[c3@2 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@13 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@14 as sum2]
+ProjectionExec: expr=[c3@0 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum2]
   GlobalLimitExec: skip=0, fetch=5
     WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int16(NULL)) }]
-      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int16(NULL)), end_bound: CurrentRow }]
-        SortExec: expr=[c3@2 + c4@3 DESC,c9@8 DESC,c2@1 ASC NULLS LAST]
-          CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+      ProjectionExec: expr=[c3@1 as c3, c4@2 as c4, c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as SUM(aggregate_test_100.c9)]
+        BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int16(NULL)), end_bound: CurrentRow }]
+          SortExec: expr=[c3@1 + c4@2 DESC,c9@3 DESC,c2@0 ASC NULLS LAST]
+            CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c2, c3, c4, c9]
 
 query III
 SELECT c3,
@@ -1776,20 +1784,22 @@ Limit: skip=0, fetch=5
   Sort: aggregate_test_100.c3 ASC NULLS LAST, fetch=5
     Projection: aggregate_test_100.c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum2
       WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-        WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-          TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+        Projection: aggregate_test_100.c3, aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+          WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+            TableScan: aggregate_test_100 projection=[c2, c3, c9]
 physical_plan
 GlobalLimitExec: skip=0, fetch=5
   SortPreservingMergeExec: [c3@0 ASC NULLS LAST]
-    ProjectionExec: expr=[c3@2 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@13 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@14 as sum2]
+    ProjectionExec: expr=[c3@0 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum2]
       BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }]
-        SortExec: expr=[c3@2 ASC NULLS LAST,c9@8 DESC]
+        SortExec: expr=[c3@0 ASC NULLS LAST,c9@1 DESC]
           CoalesceBatchesExec: target_batch_size=4096
-            RepartitionExec: partitioning=Hash([Column { name: "c3", index: 2 }], 2), input_partitions=2
+            RepartitionExec: partitioning=Hash([Column { name: "c3", index: 0 }], 2), input_partitions=2
               RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-                BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int16(NULL)), end_bound: CurrentRow }]
-                  SortExec: expr=[c3@2 DESC,c9@8 DESC,c2@1 ASC NULLS LAST]
-                    CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+                ProjectionExec: expr=[c3@1 as c3, c9@2 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as SUM(aggregate_test_100.c9)]
+                  BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int16(NULL)), end_bound: CurrentRow }]
+                    SortExec: expr=[c3@1 DESC,c9@2 DESC,c2@0 ASC NULLS LAST]
+                      CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c2, c3, c9]
 
 
 query III
@@ -1818,16 +1828,16 @@ logical_plan
 Sort: aggregate_test_100.c1 ASC NULLS LAST
   Projection: aggregate_test_100.c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS rn1
     WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
-      TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+      TableScan: aggregate_test_100 projection=[c1]
 physical_plan
 SortPreservingMergeExec: [c1@0 ASC NULLS LAST]
-  ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@13 as rn1]
+  ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as rn1]
     BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]
       SortExec: expr=[c1@0 ASC NULLS LAST]
         CoalesceBatchesExec: target_batch_size=4096
           RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 2), input_partitions=2
             RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-              CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+              CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1]
 
 query TI
 SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM aggregate_test_100 ORDER BY c1 ASC
@@ -1944,17 +1954,17 @@ logical_plan
 Sort: aggregate_test_100.c1 ASC NULLS LAST
   Projection: aggregate_test_100.c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS rn1
     WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
-      TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+      TableScan: aggregate_test_100 projection=[c1]
 physical_plan
 SortExec: expr=[c1@0 ASC NULLS LAST]
   CoalescePartitionsExec
-    ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@13 as rn1]
+    ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as rn1]
       BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]
         SortExec: expr=[c1@0 ASC NULLS LAST]
           CoalesceBatchesExec: target_batch_size=4096
             RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 2), input_partitions=2
               RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-                CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+                CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1]
 
 statement ok
 set datafusion.optimizer.repartition_sorts = true;
@@ -1971,19 +1981,19 @@ Sort: aggregate_test_100.c1 ASC NULLS LAST
   Projection: aggregate_test_100.c1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING AS sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2
     WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
       WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING]]
-        TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+        TableScan: aggregate_test_100 projection=[c1, c9]
 physical_plan
 SortExec: expr=[c1@0 ASC NULLS LAST]
-  ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@13 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as sum2]
+  ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]
     BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
-      SortPreservingMergeExec: [c9@8 ASC NULLS LAST]
-        SortExec: expr=[c9@8 ASC NULLS LAST]
+      SortPreservingMergeExec: [c9@1 ASC NULLS LAST]
+        SortExec: expr=[c9@1 ASC NULLS LAST]
           BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) }]
-            SortExec: expr=[c1@0 ASC NULLS LAST,c9@8 ASC NULLS LAST]
+            SortExec: expr=[c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]
               CoalesceBatchesExec: target_batch_size=4096
                 RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 2), input_partitions=2
                   RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-                    CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+                    CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c9]
 
 # test_window_agg_with_global_limit
 statement ok
@@ -2058,20 +2068,22 @@ Limit: skip=0, fetch=5
   Sort: aggregate_test_100.c9 ASC NULLS LAST, fetch=5
     Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100. [...]
       WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
-        WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING]]
-          WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
-            WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING]]
-              TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+        Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, SUM [...]
+          WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING]]
+            WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+              WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING]]
+                TableScan: aggregate_test_100 projection=[c1, c2, c8, c9]
 physical_plan
 GlobalLimitExec: skip=0, fetch=5
   SortExec: fetch=5, expr=[c9@0 ASC NULLS LAST]
-    ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@16 as sum2, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test [...]
+    ProjectionExec: expr=[c9@2 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@4 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@6 as sum2, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_1 [...]
       BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
-        WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
-          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
-            WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
-              SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@8 ASC NULLS LAST,c8@7 ASC NULLS LAST]
-                CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+        ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c9@3 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING@4 as SUM(aggregate_test_100.c9), SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLO [...]
+          WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
+            BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+              WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
+                SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@3 ASC NULLS LAST,c8@2 ASC NULLS LAST]
+                  CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c8, c9]
 
 
 query IIIII
@@ -2108,23 +2120,28 @@ logical_plan
 Projection: t1.c9, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum1, SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING AS sum3, SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAS [...]
   Limit: skip=0, fetch=5
     WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
-      WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING]]
-        WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
-          WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING]]
-            SubqueryAlias: t1
-              Sort: aggregate_test_100.c9 ASC NULLS LAST
-                Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c8, aggregate_test_100.c9, aggregate_test_100.c1 AS c1_alias
-                  TableScan: aggregate_test_100 projection=[c1, c2, c8, c9]
+      Projection: t1.c2, t1.c9, t1.c1_alias, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING
+        WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING]]
+          Projection: t1.c2, t1.c8, t1.c9, t1.c1_alias, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING
+            WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+              WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING]]
+                SubqueryAlias: t1
+                  Sort: aggregate_test_100.c9 ASC NULLS LAST
+                    Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c8, aggregate_test_100.c9, aggregate_test_100.c1 AS c1_alias
+                      TableScan: aggregate_test_100 projection=[c1, c2, c8, c9]
 physical_plan
-ProjectionExec: expr=[c9@3 as c9, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@6 as sum1, SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@8 as sum2, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING@5 as sum3, SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [...]
+ProjectionExec: expr=[c9@1 as c9, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@4 as sum1, SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@6 as sum2, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING@3 as sum3, SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [...]
   GlobalLimitExec: skip=0, fetch=5
     BoundedWindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
-      WindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
-        BoundedWindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
-          WindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
-            SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@3 ASC NULLS LAST,c8@2 ASC NULLS LAST]
-              ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c8@2 as c8, c9@3 as c9, c1@0 as c1_alias]
-                CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c8, c9]
+      ProjectionExec: expr=[c2@0 as c2, c9@2 as c9, c1_alias@3 as c1_alias, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING@4 as SUM(t1.c9), SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@5 as SUM(t1.c9), SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND  [...]
+        WindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
+          SortExec: expr=[c2@0 ASC NULLS LAST,c1_alias@3 ASC NULLS LAST,c9@2 ASC NULLS LAST,c8@1 ASC NULLS LAST]
+            ProjectionExec: expr=[c2@1 as c2, c8@2 as c8, c9@3 as c9, c1_alias@4 as c1_alias, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING@5 as SUM(t1.c9), SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@6 as SUM(t1.c9)]
+              BoundedWindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+                WindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
+                  SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@3 ASC NULLS LAST,c8@2 ASC NULLS LAST]
+                    ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c8@2 as c8, c9@3 as c9, c1@0 as c1_alias]
+                      CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c8, c9]
 
 
 query IIIII
@@ -2157,17 +2174,19 @@ Projection: sum1, sum2
     Sort: aggregate_test_100.c9 ASC NULLS LAST, fetch=5
       Projection: SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS sum1, SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST] GROUPS BETWEEN 5 PRECEDING AND 3 PRECEDING AS sum2, aggregate_test_100.c9
         WindowAggr: windowExpr=[[SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST] GROUPS BETWEEN 5 PRECEDING AND 3 PRECEDING]]
-          WindowAggr: windowExpr=[[SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
-            TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+          Projection: aggregate_test_100.c1, aggregate_test_100.c9, aggregate_test_100.c12, SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING
+            WindowAggr: windowExpr=[[SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+              TableScan: aggregate_test_100 projection=[c1, c2, c9, c12]
 physical_plan
 ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2]
   GlobalLimitExec: skip=0, fetch=5
     SortExec: fetch=5, expr=[c9@2 ASC NULLS LAST]
-      ProjectionExec: expr=[SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING@13 as sum1, SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST] GROUPS BETWEEN 5 PRECEDING AND 3 PRECEDING@14 as sum2, c9@8 as c9]
+      ProjectionExec: expr=[SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as sum1, SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST] GROUPS BETWEEN 5 PRECEDING AND 3 PRECEDING@4 as sum2, c9@1 as c9]
         BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c12): Ok(Field { name: "SUM(aggregate_test_100.c12)", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Groups, start_bound: Preceding(UInt64(5)), end_bound: Preceding(UInt64(3)) }]
-          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c12): Ok(Field { name: "SUM(aggregate_test_100.c12)", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Groups, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]
-            SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]
-              CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+          ProjectionExec: expr=[c1@0 as c1, c9@2 as c9, c12@3 as c12, SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as SUM(aggregate_test_100.c12)]
+            BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c12): Ok(Field { name: "SUM(aggregate_test_100.c12)", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Groups, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]
+              SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]
+                CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c9, c12]
 
 query RR
 SELECT SUM(c12) OVER(ORDER BY c1, c2 GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as sum1,
diff --git a/datafusion/optimizer/src/push_down_projection.rs b/datafusion/optimizer/src/push_down_projection.rs
index 97ba5a92d7..5c6f10825a 100644
--- a/datafusion/optimizer/src/push_down_projection.rs
+++ b/datafusion/optimizer/src/push_down_projection.rs
@@ -312,11 +312,28 @@ impl OptimizerRule for PushDownProjection {
 
                 if new_window_expr.is_empty() {
                     // none columns in window expr are needed, remove the window expr
-                    let new_window = window.input.as_ref().clone();
+                    let input = window.input.clone();
+                    let new_window = restrict_outputs(input.clone(), &required_columns)?
+                        .unwrap_or((*input).clone());
 
                     generate_plan!(projection_is_empty, plan, new_window)
                 } else {
-                    let new_window = LogicalPlanBuilder::from((*(window.input)).clone())
+                    let mut referenced_inputs = HashSet::new();
+                    exprlist_to_columns(&new_window_expr, &mut referenced_inputs)?;
+                    window
+                        .input
+                        .schema()
+                        .fields()
+                        .iter()
+                        .filter(|f| required_columns.contains(&f.qualified_column()))
+                        .for_each(|f| {
+                            referenced_inputs.insert(f.qualified_column());
+                        });
+
+                    let input = window.input.clone();
+                    let new_input = restrict_outputs(input.clone(), &referenced_inputs)?
+                        .unwrap_or((*input).clone());
+                    let new_window = LogicalPlanBuilder::from(new_input)
                         .window(new_window_expr)?
                         .build()?;
 
@@ -553,6 +570,21 @@ fn push_down_scan(
     }))
 }
 
+fn restrict_outputs(
+    plan: Arc<LogicalPlan>,
+    permitted_outputs: &HashSet<Column>,
+) -> Result<Option<LogicalPlan>> {
+    let schema = plan.schema();
+    if permitted_outputs.len() == schema.fields().len() {
+        return Ok(None);
+    }
+    Ok(Some(generate_projection(
+        permitted_outputs,
+        schema,
+        plan.clone(),
+    )?))
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -564,12 +596,15 @@ mod tests {
     use datafusion_common::DFSchema;
     use datafusion_expr::expr;
     use datafusion_expr::expr::Cast;
+    use datafusion_expr::WindowFrame;
+    use datafusion_expr::WindowFunction;
     use datafusion_expr::{
         col, count, lit,
         logical_plan::{builder::LogicalPlanBuilder, table_scan, JoinType},
         max, min, AggregateFunction, Expr,
     };
     use std::collections::HashMap;
+    use std::vec;
 
     #[test]
     fn aggregate_no_group_by() -> Result<()> {
@@ -1053,6 +1088,43 @@ mod tests {
         assert_optimized_plan_eq(&plan, expected)
     }
 
+    #[test]
+    fn test_window() -> Result<()> {
+        let table_scan = test_table_scan()?;
+
+        let max1 = Expr::WindowFunction(expr::WindowFunction::new(
+            WindowFunction::AggregateFunction(AggregateFunction::Max),
+            vec![col("test.a")],
+            vec![col("test.b")],
+            vec![],
+            WindowFrame::new(false),
+        ));
+
+        let max2 = Expr::WindowFunction(expr::WindowFunction::new(
+            WindowFunction::AggregateFunction(AggregateFunction::Max),
+            vec![col("test.b")],
+            vec![],
+            vec![],
+            WindowFrame::new(false),
+        ));
+        let col1 = col(max1.display_name()?);
+        let col2 = col(max2.display_name()?);
+
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .window(vec![max1])?
+            .window(vec![max2])?
+            .project(vec![col1, col2])?
+            .build()?;
+
+        let expected = "Projection: MAX(test.a) PARTITION BY [test.b] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, MAX(test.b) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING\
+        \n  WindowAggr: windowExpr=[[MAX(test.b) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]\
+        \n    Projection: test.b, MAX(test.a) PARTITION BY [test.b] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING\
+        \n      WindowAggr: windowExpr=[[MAX(test.a) PARTITION BY [test.b] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]\
+        \n        TableScan: test projection=[a, b]";
+
+        assert_optimized_plan_eq(&plan, expected)
+    }
+
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> {
         let optimized_plan = optimize(plan).expect("failed to optimize plan");
         let formatted_plan = format!("{optimized_plan:?}");