You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/05/15 14:24:17 UTC

[arrow-datafusion] branch main updated: Fix explain plan formatting in sqllogictest (#6329)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 62621eef47 Fix explain plan formatting in sqllogictest (#6329)
62621eef47 is described below

commit 62621eef47803110728fad41774e01d587c0ed9e
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Mon May 15 10:24:11 2023 -0400

    Fix explain plan formatting in sqllogictest (#6329)
    
    * Fix explain plan formatting in sqllogictest
    
    * Use `-` to ensure plan indent is maintained
    
    * Refine
    
    * Update tets
---
 .../src/engines/datafusion/normalize.rs            |   15 +-
 .../core/tests/sqllogictests/test_files/errors.slt |    1 -
 .../tests/sqllogictests/test_files/explain.slt     |   24 +-
 .../tests/sqllogictests/test_files/groupby.slt     |   16 +-
 .../core/tests/sqllogictests/test_files/json.slt   |   10 +-
 .../core/tests/sqllogictests/test_files/order.slt  |    8 +-
 .../core/tests/sqllogictests/test_files/select.slt |   12 +-
 .../core/tests/sqllogictests/test_files/union.slt  |  220 ++---
 .../core/tests/sqllogictests/test_files/window.slt | 1012 ++++++++++----------
 9 files changed, 664 insertions(+), 654 deletions(-)

diff --git a/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs b/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs
index 20cd2331a4..6dd4e17d7d 100644
--- a/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs
+++ b/datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs
@@ -74,7 +74,7 @@ pub fn convert_batches(batches: Vec<RecordBatch>) -> Result<Vec<Vec<String>>> {
 ///   "Sort: d.b ASC NULLS LAST",
 /// ]
 /// [ <--- newly added row
-///   "  Projection: d.b, MAX(d.a) AS max_a",
+///   "|-- Projection: d.b, MAX(d.a) AS max_a",
 /// ]
 /// ```
 fn expand_row(mut row: Vec<String>) -> impl Iterator<Item = Vec<String>> {
@@ -92,7 +92,18 @@ fn expand_row(mut row: Vec<String>) -> impl Iterator<Item = Vec<String>> {
         }
 
         // form new rows with each additional line
-        let new_lines: Vec<_> = lines.into_iter().map(|l| vec![l.to_string()]).collect();
+        let new_lines: Vec<_> = lines
+            .into_iter()
+            .map(|l| {
+                // replace any leading spaces with '-' as
+                // `sqllogictest` ignores whitespace differences
+                //
+                // See https://github.com/apache/arrow-datafusion/issues/6328
+                let content = l.trim_start();
+                let new_prefix = "-".repeat(l.len() - content.len());
+                vec![format!("{new_prefix}{content}")]
+            })
+            .collect();
 
         Either::Right(once(row).chain(new_lines.into_iter()))
     } else {
diff --git a/datafusion/core/tests/sqllogictests/test_files/errors.slt b/datafusion/core/tests/sqllogictests/test_files/errors.slt
index ee934f4618..b8f650d61e 100644
--- a/datafusion/core/tests/sqllogictests/test_files/errors.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/errors.slt
@@ -72,4 +72,3 @@ SELECT COUNT(*) FROM nonexistentcatalog.public.aggregate_test_100
 
 statement error Error during planning: Unsupported compound identifier '\[Ident \{ value: "way", quote_style: None \}, Ident \{ value: "too", quote_style: None \}, Ident \{ value: "many", quote_style: None \}, Ident \{ value: "namespaces", quote_style: None \}, Ident \{ value: "as", quote_style: None \}, Ident \{ value: "ident", quote_style: None \}, Ident \{ value: "prefixes", quote_style: None \}, Ident \{ value: "aggregate_test_100", quote_style: None \}\]'
 SELECT COUNT(*) FROM way.too.many.namespaces.as.ident.prefixes.aggregate_test_100
-
diff --git a/datafusion/core/tests/sqllogictests/test_files/explain.slt b/datafusion/core/tests/sqllogictests/test_files/explain.slt
index 65b72d0c9a..c64eaf62d3 100644
--- a/datafusion/core/tests/sqllogictests/test_files/explain.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/explain.slt
@@ -40,14 +40,14 @@ explain SELECT c1 FROM aggregate_test_100 where c2 > 10
 ----
 logical_plan
 Projection: aggregate_test_100.c1
-  Filter: aggregate_test_100.c2 > Int8(10)
-    TableScan: aggregate_test_100 projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > Int8(10)]
+--Filter: aggregate_test_100.c2 > Int8(10)
+----TableScan: aggregate_test_100 projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > Int8(10)]
 physical_plan
 ProjectionExec: expr=[c1@0 as c1]
-  CoalesceBatchesExec: target_batch_size=8192
-    FilterExec: c2@1 > 10
-      RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-        CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2], has_header=true
+--CoalesceBatchesExec: target_batch_size=8192
+----FilterExec: c2@1 > 10
+------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2], has_header=true
 
 # explain_csv_exec_scan_config
 
@@ -77,11 +77,11 @@ explain SELECT c1 FROM aggregate_test_100_with_order order by c1 ASC limit 10
 ----
 logical_plan
 Limit: skip=0, fetch=10
-  Sort: aggregate_test_100_with_order.c1 ASC NULLS LAST, fetch=10
-    TableScan: aggregate_test_100_with_order projection=[c1]
+--Sort: aggregate_test_100_with_order.c1 ASC NULLS LAST, fetch=10
+----TableScan: aggregate_test_100_with_order projection=[c1]
 physical_plan
 GlobalLimitExec: skip=0, fetch=10
-  CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], output_ordering=[c1@0 ASC NULLS LAST], has_header=true
+--CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], output_ordering=[c1@0 ASC NULLS LAST], has_header=true
 
 
 ## explain_physical_plan_only
@@ -94,7 +94,7 @@ EXPLAIN select count(*) from (values ('a', 1, 100), ('a', 2, 150)) as t (c1,c2,c
 ----
 physical_plan
 ProjectionExec: expr=[2 as COUNT(UInt8(1))]
-  EmptyExec: produce_one_row=true
+--EmptyExec: produce_one_row=true
 
 statement ok
 set datafusion.explain.physical_plan_only = false
@@ -139,7 +139,7 @@ EXPLAIN VERBOSE SELECT a, b, c FROM simple_explain_test
 ----
 initial_logical_plan
 Projection: simple_explain_test.a, simple_explain_test.b, simple_explain_test.c
-  TableScan: simple_explain_test
+--TableScan: simple_explain_test
 logical_plan after inline_table_scan SAME TEXT AS ABOVE
 logical_plan after type_coercion SAME TEXT AS ABOVE
 logical_plan after count_wildcard_rule SAME TEXT AS ABOVE
@@ -170,7 +170,7 @@ logical_plan after unwrap_cast_in_comparison SAME TEXT AS ABOVE
 logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE
 logical_plan after push_down_projection
 Projection: simple_explain_test.a, simple_explain_test.b, simple_explain_test.c
-  TableScan: simple_explain_test projection=[a, b, c]
+--TableScan: simple_explain_test projection=[a, b, c]
 logical_plan after eliminate_projection TableScan: simple_explain_test projection=[a, b, c]
 logical_plan after push_down_limit SAME TEXT AS ABOVE
 logical_plan after simplify_expressions SAME TEXT AS ABOVE
diff --git a/datafusion/core/tests/sqllogictests/test_files/groupby.slt b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
index b565b0dc9a..b9d2543e11 100644
--- a/datafusion/core/tests/sqllogictests/test_files/groupby.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
@@ -1954,12 +1954,12 @@ EXPLAIN SELECT a, b,
 ----
 logical_plan
 Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, SUM(annotated_data_infinite2.c) AS summation1
-  Aggregate: groupBy=[[annotated_data_infinite2.b, annotated_data_infinite2.a]], aggr=[[SUM(annotated_data_infinite2.c)]]
-    TableScan: annotated_data_infinite2 projection=[a, b, c]
+--Aggregate: groupBy=[[annotated_data_infinite2.b, annotated_data_infinite2.a]], aggr=[[SUM(annotated_data_infinite2.c)]]
+----TableScan: annotated_data_infinite2 projection=[a, b, c]
 physical_plan
 ProjectionExec: expr=[a@1 as a, b@0 as b, SUM(annotated_data_infinite2.c)@2 as summation1]
-  AggregateExec: mode=Single, gby=[b@1 as b, a@0 as a], aggr=[SUM(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
-    CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
+--AggregateExec: mode=Single, gby=[b@1 as b, a@0 as a], aggr=[SUM(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
+----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
 
 
 query III
@@ -1984,12 +1984,12 @@ EXPLAIN SELECT a, d,
 ----
 logical_plan
 Projection: annotated_data_infinite2.a, annotated_data_infinite2.d, SUM(annotated_data_infinite2.c) AS summation1
-  Aggregate: groupBy=[[annotated_data_infinite2.d, annotated_data_infinite2.a]], aggr=[[SUM(annotated_data_infinite2.c)]]
-    TableScan: annotated_data_infinite2 projection=[a, c, d]
+--Aggregate: groupBy=[[annotated_data_infinite2.d, annotated_data_infinite2.a]], aggr=[[SUM(annotated_data_infinite2.c)]]
+----TableScan: annotated_data_infinite2 projection=[a, c, d]
 physical_plan
 ProjectionExec: expr=[a@1 as a, d@0 as d, SUM(annotated_data_infinite2.c)@2 as summation1]
-  AggregateExec: mode=Single, gby=[d@2 as d, a@0 as a], aggr=[SUM(annotated_data_infinite2.c)], ordering_mode=PartiallyOrdered
-    CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true
+--AggregateExec: mode=Single, gby=[d@2 as d, a@0 as a], aggr=[SUM(annotated_data_infinite2.c)], ordering_mode=PartiallyOrdered
+----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true
 
 query III
 SELECT a, d,
diff --git a/datafusion/core/tests/sqllogictests/test_files/json.slt b/datafusion/core/tests/sqllogictests/test_files/json.slt
index dc0dd39b2c..7092127a79 100644
--- a/datafusion/core/tests/sqllogictests/test_files/json.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/json.slt
@@ -50,13 +50,13 @@ EXPLAIN SELECT count(*) from json_test
 ----
 logical_plan
 Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]
-  TableScan: json_test projection=[a]
+--TableScan: json_test projection=[a]
 physical_plan
 AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]
-  CoalescePartitionsExec
-    AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]
-      RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-        JsonExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/2.json]]}, projection=[a]
+--CoalescePartitionsExec
+----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]
+------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+--------JsonExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/2.json]]}, projection=[a]
 
 query error DataFusion error: Schema error: No field named mycol\.
 SELECT mycol FROM single_nan
diff --git a/datafusion/core/tests/sqllogictests/test_files/order.slt b/datafusion/core/tests/sqllogictests/test_files/order.slt
index 3b77319c0c..92faff623c 100644
--- a/datafusion/core/tests/sqllogictests/test_files/order.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/order.slt
@@ -160,12 +160,12 @@ explain SELECT c1, c2 FROM aggregate_test_100 ORDER BY c2, c3, c2
 ----
 logical_plan
 Projection: aggregate_test_100.c1, aggregate_test_100.c2
-  Sort: aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST
-    TableScan: aggregate_test_100 projection=[c1, c2, c3]
+--Sort: aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST
+----TableScan: aggregate_test_100 projection=[c1, c2, c3]
 physical_plan
 ProjectionExec: expr=[c1@0 as c1, c2@1 as c2]
-  SortExec: expr=[c2@1 ASC NULLS LAST,c3@2 ASC NULLS LAST]
-    CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], has_header=true
+--SortExec: expr=[c2@1 ASC NULLS LAST,c3@2 ASC NULLS LAST]
+----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], has_header=true
 
 query II
 SELECT c2, c3 FROM aggregate_test_100 ORDER BY c2, c3, c2
diff --git a/datafusion/core/tests/sqllogictests/test_files/select.slt b/datafusion/core/tests/sqllogictests/test_files/select.slt
index 440e7d8010..03f96bad95 100644
--- a/datafusion/core/tests/sqllogictests/test_files/select.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/select.slt
@@ -448,11 +448,11 @@ EXPLAIN SELECT c1 BETWEEN 2 AND 3 FROM select_between_data
 ----
 logical_plan
 Projection: select_between_data.c1 >= Int64(2) AND select_between_data.c1 <= Int64(3) AS select_between_data.c1 BETWEEN Int64(2) AND Int64(3)
-  TableScan: select_between_data projection=[c1]
+--TableScan: select_between_data projection=[c1]
 physical_plan
 ProjectionExec: expr=[c1@0 >= 2 AND c1@0 <= 3 as select_between_data.c1 BETWEEN Int64(2) AND Int64(3)]
-  RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-    MemoryExec: partitions=1, partition_sizes=[1]
+--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+----MemoryExec: partitions=1, partition_sizes=[1]
 
 
 # TODO: query_get_indexed_field
@@ -714,11 +714,11 @@ EXPLAIN SELECT a FROM annotated_data_finite2
 ----
 logical_plan
 Limit: skip=0, fetch=5
-  Sort: annotated_data_finite2.a ASC NULLS LAST, fetch=5
-    TableScan: annotated_data_finite2 projection=[a]
+--Sort: annotated_data_finite2.a ASC NULLS LAST, fetch=5
+----TableScan: annotated_data_finite2 projection=[a]
 physical_plan
 GlobalLimitExec: skip=0, fetch=5
-  CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a], output_ordering=[a@0 ASC NULLS LAST], has_header=true
+--CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a], output_ordering=[a@0 ASC NULLS LAST], has_header=true
 
 query I
 SELECT a FROM annotated_data_finite2
diff --git a/datafusion/core/tests/sqllogictests/test_files/union.slt b/datafusion/core/tests/sqllogictests/test_files/union.slt
index a9e247d478..9015996aed 100644
--- a/datafusion/core/tests/sqllogictests/test_files/union.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/union.slt
@@ -191,44 +191,44 @@ UNION ALL
 ----
 logical_plan
 Union
-  LeftAnti Join: t1.id = CAST(t2.id AS Int32), t1.name = t2.name
-    Aggregate: groupBy=[[t1.id, t1.name]], aggr=[[]]
-      TableScan: t1 projection=[id, name]
-    TableScan: t2 projection=[id, name]
-  Projection: CAST(t2.id AS Int32) AS id, t2.name
-    LeftAnti Join: CAST(t2.id AS Int32) = t1.id, t2.name = t1.name
-      Aggregate: groupBy=[[t2.id, t2.name]], aggr=[[]]
-        TableScan: t2 projection=[id, name]
-      TableScan: t1 projection=[id, name]
+--LeftAnti Join: t1.id = CAST(t2.id AS Int32), t1.name = t2.name
+----Aggregate: groupBy=[[t1.id, t1.name]], aggr=[[]]
+------TableScan: t1 projection=[id, name]
+----TableScan: t2 projection=[id, name]
+--Projection: CAST(t2.id AS Int32) AS id, t2.name
+----LeftAnti Join: CAST(t2.id AS Int32) = t1.id, t2.name = t1.name
+------Aggregate: groupBy=[[t2.id, t2.name]], aggr=[[]]
+--------TableScan: t2 projection=[id, name]
+------TableScan: t1 projection=[id, name]
 physical_plan
 UnionExec
-  ProjectionExec: expr=[id@0 as id, name@1 as name]
-    CoalesceBatchesExec: target_batch_size=8192
-      HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(Column { name: "id", index: 0 }, Column { name: "CAST(t2.id AS Int32)", index: 2 }), (Column { name: "name", index: 1 }, Column { name: "name", index: 1 })]
-        AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as name], aggr=[]
-          CoalesceBatchesExec: target_batch_size=8192
-            RepartitionExec: partitioning=Hash([Column { name: "id", index: 0 }, Column { name: "name", index: 1 }], 4), input_partitions=4
-              AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as name], aggr=[]
-                MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
-        CoalesceBatchesExec: target_batch_size=8192
-          RepartitionExec: partitioning=Hash([Column { name: "CAST(t2.id AS Int32)", index: 2 }, Column { name: "name", index: 1 }], 4), input_partitions=4
-            ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS Int32) as CAST(t2.id AS Int32)]
-              MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
-  ProjectionExec: expr=[CAST(id@0 AS Int32) as id, name@1 as name]
-    ProjectionExec: expr=[id@0 as id, name@1 as name]
-      CoalesceBatchesExec: target_batch_size=8192
-        HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(Column { name: "CAST(t2.id AS Int32)", index: 2 }, Column { name: "id", index: 0 }), (Column { name: "name", index: 1 }, Column { name: "name", index: 1 })]
-          CoalesceBatchesExec: target_batch_size=8192
-            RepartitionExec: partitioning=Hash([Column { name: "CAST(t2.id AS Int32)", index: 2 }, Column { name: "name", index: 1 }], 4), input_partitions=4
-              ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS Int32) as CAST(t2.id AS Int32)]
-                AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as name], aggr=[]
-                  CoalesceBatchesExec: target_batch_size=8192
-                    RepartitionExec: partitioning=Hash([Column { name: "id", index: 0 }, Column { name: "name", index: 1 }], 4), input_partitions=4
-                      AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as name], aggr=[]
-                        MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
-          CoalesceBatchesExec: target_batch_size=8192
-            RepartitionExec: partitioning=Hash([Column { name: "id", index: 0 }, Column { name: "name", index: 1 }], 4), input_partitions=4
-              MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+--ProjectionExec: expr=[id@0 as id, name@1 as name]
+----CoalesceBatchesExec: target_batch_size=8192
+------HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(Column { name: "id", index: 0 }, Column { name: "CAST(t2.id AS Int32)", index: 2 }), (Column { name: "name", index: 1 }, Column { name: "name", index: 1 })]
+--------AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as name], aggr=[]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------RepartitionExec: partitioning=Hash([Column { name: "id", index: 0 }, Column { name: "name", index: 1 }], 4), input_partitions=4
+--------------AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as name], aggr=[]
+----------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+--------CoalesceBatchesExec: target_batch_size=8192
+----------RepartitionExec: partitioning=Hash([Column { name: "CAST(t2.id AS Int32)", index: 2 }, Column { name: "name", index: 1 }], 4), input_partitions=4
+------------ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS Int32) as CAST(t2.id AS Int32)]
+--------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+--ProjectionExec: expr=[CAST(id@0 AS Int32) as id, name@1 as name]
+----ProjectionExec: expr=[id@0 as id, name@1 as name]
+------CoalesceBatchesExec: target_batch_size=8192
+--------HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(Column { name: "CAST(t2.id AS Int32)", index: 2 }, Column { name: "id", index: 0 }), (Column { name: "name", index: 1 }, Column { name: "name", index: 1 })]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------RepartitionExec: partitioning=Hash([Column { name: "CAST(t2.id AS Int32)", index: 2 }, Column { name: "name", index: 1 }], 4), input_partitions=4
+--------------ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS Int32) as CAST(t2.id AS Int32)]
+----------------AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as name], aggr=[]
+------------------CoalesceBatchesExec: target_batch_size=8192
+--------------------RepartitionExec: partitioning=Hash([Column { name: "id", index: 0 }, Column { name: "name", index: 1 }], 4), input_partitions=4
+----------------------AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as name], aggr=[]
+------------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------RepartitionExec: partitioning=Hash([Column { name: "id", index: 0 }, Column { name: "name", index: 1 }], 4), input_partitions=4
+--------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
 
 query IT rowsort
 (
@@ -263,36 +263,36 @@ UNION ALL
 ----
 logical_plan
 Union
-  LeftAnti Join: t1.name = t2.name
-    Aggregate: groupBy=[[t1.name]], aggr=[[]]
-      TableScan: t1 projection=[name]
-    TableScan: t2 projection=[name]
-  LeftAnti Join: t2.name = t1.name
-    Aggregate: groupBy=[[t2.name]], aggr=[[]]
-      TableScan: t2 projection=[name]
-    TableScan: t1 projection=[name]
+--LeftAnti Join: t1.name = t2.name
+----Aggregate: groupBy=[[t1.name]], aggr=[[]]
+------TableScan: t1 projection=[name]
+----TableScan: t2 projection=[name]
+--LeftAnti Join: t2.name = t1.name
+----Aggregate: groupBy=[[t2.name]], aggr=[[]]
+------TableScan: t2 projection=[name]
+----TableScan: t1 projection=[name]
 physical_plan
-  InterleaveExec
-    CoalesceBatchesExec: target_batch_size=8192
-      HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(Column { name: "name", index: 0 }, Column { name: "name", index: 0 })]
-        AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
-          CoalesceBatchesExec: target_batch_size=8192
-            RepartitionExec: partitioning=Hash([Column { name: "name", index: 0 }], 4), input_partitions=4
-              AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
-                MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
-        CoalesceBatchesExec: target_batch_size=8192
-          RepartitionExec: partitioning=Hash([Column { name: "name", index: 0 }], 4), input_partitions=4
-            MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
-    CoalesceBatchesExec: target_batch_size=8192
-      HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(Column { name: "name", index: 0 }, Column { name: "name", index: 0 })]
-        AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
-          CoalesceBatchesExec: target_batch_size=8192
-            RepartitionExec: partitioning=Hash([Column { name: "name", index: 0 }], 4), input_partitions=4
-              AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
-                MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
-        CoalesceBatchesExec: target_batch_size=8192
-          RepartitionExec: partitioning=Hash([Column { name: "name", index: 0 }], 4), input_partitions=4
-            MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+InterleaveExec
+--CoalesceBatchesExec: target_batch_size=8192
+----HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(Column { name: "name", index: 0 }, Column { name: "name", index: 0 })]
+------AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
+--------CoalesceBatchesExec: target_batch_size=8192
+----------RepartitionExec: partitioning=Hash([Column { name: "name", index: 0 }], 4), input_partitions=4
+------------AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
+--------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+------CoalesceBatchesExec: target_batch_size=8192
+--------RepartitionExec: partitioning=Hash([Column { name: "name", index: 0 }], 4), input_partitions=4
+----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+--CoalesceBatchesExec: target_batch_size=8192
+----HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(Column { name: "name", index: 0 }, Column { name: "name", index: 0 })]
+------AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
+--------CoalesceBatchesExec: target_batch_size=8192
+----------RepartitionExec: partitioning=Hash([Column { name: "name", index: 0 }], 4), input_partitions=4
+------------AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
+--------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+------CoalesceBatchesExec: target_batch_size=8192
+--------RepartitionExec: partitioning=Hash([Column { name: "name", index: 0 }], 4), input_partitions=4
+----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
 
 # union_upcast_types
 query TT
@@ -300,24 +300,24 @@ explain SELECT c1, c9 FROM aggregate_test_100 UNION ALL SELECT c1, c3 FROM aggre
 ----
 logical_plan
 Limit: skip=0, fetch=5
-  Sort: aggregate_test_100.c9 DESC NULLS FIRST, fetch=5
-    Union
-      Projection: aggregate_test_100.c1, CAST(aggregate_test_100.c9 AS Int64) AS c9
-        TableScan: aggregate_test_100 projection=[c1, c9]
-      Projection: aggregate_test_100.c1, CAST(aggregate_test_100.c3 AS Int64) AS c9
-        TableScan: aggregate_test_100 projection=[c1, c3]
+--Sort: aggregate_test_100.c9 DESC NULLS FIRST, fetch=5
+----Union
+------Projection: aggregate_test_100.c1, CAST(aggregate_test_100.c9 AS Int64) AS c9
+--------TableScan: aggregate_test_100 projection=[c1, c9]
+------Projection: aggregate_test_100.c1, CAST(aggregate_test_100.c3 AS Int64) AS c9
+--------TableScan: aggregate_test_100 projection=[c1, c3]
 physical_plan
 GlobalLimitExec: skip=0, fetch=5
-  SortPreservingMergeExec: [c9@1 DESC]
-    UnionExec
-      SortExec: expr=[c9@1 DESC]
-        ProjectionExec: expr=[c1@0 as c1, CAST(c9@1 AS Int64) as c9]
-          RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-            CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c9], has_header=true
-      SortExec: expr=[c9@1 DESC]
-        ProjectionExec: expr=[c1@0 as c1, CAST(c3@1 AS Int64) as c9]
-          RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-            CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], has_header=true
+--SortPreservingMergeExec: [c9@1 DESC]
+----UnionExec
+------SortExec: expr=[c9@1 DESC]
+--------ProjectionExec: expr=[c1@0 as c1, CAST(c9@1 AS Int64) as c9]
+----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c9], has_header=true
+------SortExec: expr=[c9@1 DESC]
+--------ProjectionExec: expr=[c1@0 as c1, CAST(c3@1 AS Int64) as c9]
+----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], has_header=true
 
 query TI
 SELECT c1, c9 FROM aggregate_test_100 UNION ALL SELECT c1, c3 FROM aggregate_test_100 ORDER BY c9 DESC LIMIT 5
@@ -339,26 +339,26 @@ SELECT count(*) FROM (
 ----
 logical_plan
 Projection: COUNT(UInt8(1))
-  Aggregate: groupBy=[[t1.name]], aggr=[[COUNT(UInt8(1))]]
-    Union
-      Aggregate: groupBy=[[t1.name]], aggr=[[]]
-        TableScan: t1 projection=[name]
-      Aggregate: groupBy=[[t2.name]], aggr=[[]]
-        TableScan: t2 projection=[name]
+--Aggregate: groupBy=[[t1.name]], aggr=[[COUNT(UInt8(1))]]
+----Union
+------Aggregate: groupBy=[[t1.name]], aggr=[[]]
+--------TableScan: t1 projection=[name]
+------Aggregate: groupBy=[[t2.name]], aggr=[[]]
+--------TableScan: t2 projection=[name]
 physical_plan
 ProjectionExec: expr=[COUNT(UInt8(1))@1 as COUNT(UInt8(1))]
-  AggregateExec: mode=Single, gby=[name@0 as name], aggr=[COUNT(UInt8(1))]
-    InterleaveExec
-      AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
-        CoalesceBatchesExec: target_batch_size=8192
-          RepartitionExec: partitioning=Hash([Column { name: "name", index: 0 }], 4), input_partitions=4
-            AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
-              MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
-      AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
-        CoalesceBatchesExec: target_batch_size=8192
-          RepartitionExec: partitioning=Hash([Column { name: "name", index: 0 }], 4), input_partitions=4
-            AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
-              MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+--AggregateExec: mode=Single, gby=[name@0 as name], aggr=[COUNT(UInt8(1))]
+----InterleaveExec
+------AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
+--------CoalesceBatchesExec: target_batch_size=8192
+----------RepartitionExec: partitioning=Hash([Column { name: "name", index: 0 }], 4), input_partitions=4
+------------AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
+--------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+------AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
+--------CoalesceBatchesExec: target_batch_size=8192
+----------RepartitionExec: partitioning=Hash([Column { name: "name", index: 0 }], 4), input_partitions=4
+------------AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
+--------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
 
 
 ########
@@ -429,20 +429,20 @@ UNION ALL
 ORDER BY c1
 ----
 logical_plan
-  Sort: t1.c1 ASC NULLS LAST
-    Union
-      TableScan: t1 projection=[c1]
-      Projection: t2.c1a AS t1.c1
-        TableScan: t2 projection=[c1a]
+Sort: t1.c1 ASC NULLS LAST
+--Union
+----TableScan: t1 projection=[c1]
+----Projection: t2.c1a AS t1.c1
+------TableScan: t2 projection=[c1a]
 physical_plan
-  SortPreservingMergeExec: [c1@0 ASC NULLS LAST]
-    UnionExec
-      CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], output_ordering=[c1@0 ASC NULLS LAST], has_header=true
-        ProjectionExec: expr=[c1a@0 as t1.c1]
-      CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1a], output_ordering=[c1a@0 ASC NULLS LAST], has_header=true
+SortPreservingMergeExec: [c1@0 ASC NULLS LAST]
+--UnionExec
+----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], output_ordering=[c1@0 ASC NULLS LAST], has_header=true
+----ProjectionExec: expr=[c1a@0 as t1.c1]
+------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1a], output_ordering=[c1a@0 ASC NULLS LAST], has_header=true
 
 statement ok
 drop table t1
 
 statement ok
-drop table t2
\ No newline at end of file
+drop table t2
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt b/datafusion/core/tests/sqllogictests/test_files/window.slt
index 02c5e6cdb0..80423a5fe2 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -254,38 +254,38 @@ WITH _sample_data AS (
 ----
 logical_plan
 Sort: d.b ASC NULLS LAST
-  Projection: d.b, MAX(d.a) AS max_a
-    Aggregate: groupBy=[[d.b]], aggr=[[MAX(d.a)]]
-      SubqueryAlias: d
-        SubqueryAlias: _data2
-          SubqueryAlias: s
-            SubqueryAlias: _sample_data
-              Union
-                Projection: Int64(1) AS a, Utf8("aa") AS b
-                  EmptyRelation
-                Projection: Int64(3) AS a, Utf8("aa") AS b
-                  EmptyRelation
-                Projection: Int64(5) AS a, Utf8("bb") AS b
-                  EmptyRelation
-                Projection: Int64(7) AS a, Utf8("bb") AS b
-                  EmptyRelation
+--Projection: d.b, MAX(d.a) AS max_a
+----Aggregate: groupBy=[[d.b]], aggr=[[MAX(d.a)]]
+------SubqueryAlias: d
+--------SubqueryAlias: _data2
+----------SubqueryAlias: s
+------------SubqueryAlias: _sample_data
+--------------Union
+----------------Projection: Int64(1) AS a, Utf8("aa") AS b
+------------------EmptyRelation
+----------------Projection: Int64(3) AS a, Utf8("aa") AS b
+------------------EmptyRelation
+----------------Projection: Int64(5) AS a, Utf8("bb") AS b
+------------------EmptyRelation
+----------------Projection: Int64(7) AS a, Utf8("bb") AS b
+------------------EmptyRelation
 physical_plan
 SortPreservingMergeExec: [b@0 ASC NULLS LAST]
-  SortExec: expr=[b@0 ASC NULLS LAST]
-    ProjectionExec: expr=[b@0 as b, MAX(d.a)@1 as max_a]
-      AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[MAX(d.a)]
-        CoalesceBatchesExec: target_batch_size=8192
-          RepartitionExec: partitioning=Hash([Column { name: "b", index: 0 }], 4), input_partitions=4
-            AggregateExec: mode=Partial, gby=[b@1 as b], aggr=[MAX(d.a)]
-              UnionExec
-                ProjectionExec: expr=[1 as a, aa as b]
-                  EmptyExec: produce_one_row=true
-                ProjectionExec: expr=[3 as a, aa as b]
-                  EmptyExec: produce_one_row=true
-                ProjectionExec: expr=[5 as a, bb as b]
-                  EmptyExec: produce_one_row=true
-                ProjectionExec: expr=[7 as a, bb as b]
-                  EmptyExec: produce_one_row=true
+--SortExec: expr=[b@0 ASC NULLS LAST]
+----ProjectionExec: expr=[b@0 as b, MAX(d.a)@1 as max_a]
+------AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[MAX(d.a)]
+--------CoalesceBatchesExec: target_batch_size=8192
+----------RepartitionExec: partitioning=Hash([Column { name: "b", index: 0 }], 4), input_partitions=4
+------------AggregateExec: mode=Partial, gby=[b@1 as b], aggr=[MAX(d.a)]
+--------------UnionExec
+----------------ProjectionExec: expr=[1 as a, aa as b]
+------------------EmptyExec: produce_one_row=true
+----------------ProjectionExec: expr=[3 as a, aa as b]
+------------------EmptyExec: produce_one_row=true
+----------------ProjectionExec: expr=[5 as a, bb as b]
+------------------EmptyExec: produce_one_row=true
+----------------ProjectionExec: expr=[7 as a, bb as b]
+------------------EmptyExec: produce_one_row=true
 
 # Check actual result:
 query TI
@@ -337,41 +337,41 @@ WITH _sample_data AS (
 ----
 logical_plan
 Sort: d.b ASC NULLS LAST
-  Projection: d.b, MAX(d.a) AS max_a, MAX(d.seq)
-    Aggregate: groupBy=[[d.b]], aggr=[[MAX(d.a), MAX(d.seq)]]
-      SubqueryAlias: d
-        SubqueryAlias: _data2
-          Projection: ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS seq, s.a, s.b
-            WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-              SubqueryAlias: s
-                SubqueryAlias: _sample_data
-                  Union
-                    Projection: Int64(1) AS a, Utf8("aa") AS b
-                      EmptyRelation
-                    Projection: Int64(3) AS a, Utf8("aa") AS b
-                      EmptyRelation
-                    Projection: Int64(5) AS a, Utf8("bb") AS b
-                      EmptyRelation
-                    Projection: Int64(7) AS a, Utf8("bb") AS b
-                      EmptyRelation
+--Projection: d.b, MAX(d.a) AS max_a, MAX(d.seq)
+----Aggregate: groupBy=[[d.b]], aggr=[[MAX(d.a), MAX(d.seq)]]
+------SubqueryAlias: d
+--------SubqueryAlias: _data2
+----------Projection: ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS seq, s.a, s.b
+------------WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+--------------SubqueryAlias: s
+----------------SubqueryAlias: _sample_data
+------------------Union
+--------------------Projection: Int64(1) AS a, Utf8("aa") AS b
+----------------------EmptyRelation
+--------------------Projection: Int64(3) AS a, Utf8("aa") AS b
+----------------------EmptyRelation
+--------------------Projection: Int64(5) AS a, Utf8("bb") AS b
+----------------------EmptyRelation
+--------------------Projection: Int64(7) AS a, Utf8("bb") AS b
+----------------------EmptyRelation
 physical_plan
 SortPreservingMergeExec: [b@0 ASC NULLS LAST]
-  ProjectionExec: expr=[b@0 as b, MAX(d.a)@1 as max_a, MAX(d.seq)@2 as MAX(d.seq)]
-    AggregateExec: mode=Single, gby=[b@2 as b], aggr=[MAX(d.a), MAX(d.seq)], ordering_mode=FullyOrdered
-      ProjectionExec: expr=[ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as seq, a@0 as a, b@1 as b]
-        BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
-          SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST]
-            CoalesceBatchesExec: target_batch_size=8192
-              RepartitionExec: partitioning=Hash([Column { name: "b", index: 1 }], 4), input_partitions=4
-                UnionExec
-                  ProjectionExec: expr=[1 as a, aa as b]
-                    EmptyExec: produce_one_row=true
-                  ProjectionExec: expr=[3 as a, aa as b]
-                    EmptyExec: produce_one_row=true
-                  ProjectionExec: expr=[5 as a, bb as b]
-                    EmptyExec: produce_one_row=true
-                  ProjectionExec: expr=[7 as a, bb as b]
-                    EmptyExec: produce_one_row=true
+--ProjectionExec: expr=[b@0 as b, MAX(d.a)@1 as max_a, MAX(d.seq)@2 as MAX(d.seq)]
+----AggregateExec: mode=Single, gby=[b@2 as b], aggr=[MAX(d.a), MAX(d.seq)], ordering_mode=FullyOrdered
+------ProjectionExec: expr=[ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as seq, a@0 as a, b@1 as b]
+--------BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+----------SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST]
+------------CoalesceBatchesExec: target_batch_size=8192
+--------------RepartitionExec: partitioning=Hash([Column { name: "b", index: 1 }], 4), input_partitions=4
+----------------UnionExec
+------------------ProjectionExec: expr=[1 as a, aa as b]
+--------------------EmptyExec: produce_one_row=true
+------------------ProjectionExec: expr=[3 as a, aa as b]
+--------------------EmptyExec: produce_one_row=true
+------------------ProjectionExec: expr=[5 as a, bb as b]
+--------------------EmptyExec: produce_one_row=true
+------------------ProjectionExec: expr=[7 as a, bb as b]
+--------------------EmptyExec: produce_one_row=true
 
 
 # check actual result
@@ -1207,17 +1207,17 @@ EXPLAIN SELECT
 ----
 logical_plan
 Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum2
-  WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-    Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
-      WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-        TableScan: aggregate_test_100 projection=[c8, c9]
+--WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+----Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+--------TableScan: aggregate_test_100 projection=[c8, c9]
 physical_plan
 ProjectionExec: expr=[c9@0 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as sum2]
-  BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
-    ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as SUM(aggregate_test_100.c9)]
-      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
-        SortExec: expr=[c9@1 ASC NULLS LAST,c8@0 ASC NULLS LAST]
-          CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c8, c9], has_header=true
+--BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+----ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as SUM(aggregate_test_100.c9)]
+------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+--------SortExec: expr=[c9@1 ASC NULLS LAST,c8@0 ASC NULLS LAST]
+----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c8, c9], has_header=true
 
 
 # over_order_by_sort_keys_sorting_prefix_compacting
@@ -1228,17 +1228,17 @@ EXPLAIN SELECT c2, MAX(c9) OVER (ORDER BY c2), SUM(c9) OVER (), MIN(c9) OVER (OR
 ----
 logical_plan
 Projection: aggregate_test_100.c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
-  WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
-    WindowAggr: windowExpr=[[MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-      WindowAggr: windowExpr=[[MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-        TableScan: aggregate_test_100 projection=[c2, c9]
+--WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
+----WindowAggr: windowExpr=[[MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+------WindowAggr: windowExpr=[[MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+--------TableScan: aggregate_test_100 projection=[c2, c9]
 physical_plan
 ProjectionExec: expr=[c2@0 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@4 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as MIN(aggregate_t [...]
-  WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]
-    BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: "MAX(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }], mode=[Sorted]
-      BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: "MIN(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }], mode=[Sorted]
-        SortExec: expr=[c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]
-          CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c9], has_header=true
+--WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]
+----BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: "MAX(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+------BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: "MIN(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+--------SortExec: expr=[c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]
+----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c9], has_header=true
 
 
 # FIXME: for now we are not detecting prefix of sorting keys in order to re-arrange with global and save one SortExec
@@ -1250,20 +1250,20 @@ EXPLAIN SELECT c2, MAX(c9) OVER (ORDER BY c9, c2), SUM(c9) OVER (), MIN(c9) OVER
 ----
 logical_plan
 Sort: aggregate_test_100.c2 ASC NULLS LAST
-  Projection: aggregate_test_100.c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
-    WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
-      WindowAggr: windowExpr=[[MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-        WindowAggr: windowExpr=[[MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-          TableScan: aggregate_test_100 projection=[c2, c9]
+--Projection: aggregate_test_100.c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
+------WindowAggr: windowExpr=[[MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+--------WindowAggr: windowExpr=[[MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+----------TableScan: aggregate_test_100 projection=[c2, c9]
 physical_plan
 SortExec: expr=[c2@0 ASC NULLS LAST]
-  ProjectionExec: expr=[c2@0 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@4 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECED [...]
-    WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]
-      BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: "MAX(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
-        SortExec: expr=[c9@1 ASC NULLS LAST,c2@0 ASC NULLS LAST]
-          BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: "MIN(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }], mode=[Sorted]
-            SortExec: expr=[c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]
-              CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c9], has_header=true
+--ProjectionExec: expr=[c2@0 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@4 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECED [...]
+----WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]
+------BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: "MAX(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+--------SortExec: expr=[c9@1 ASC NULLS LAST,c2@0 ASC NULLS LAST]
+----------BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: "MIN(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+------------SortExec: expr=[c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]
+--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c9], has_header=true
 
 # test_window_partition_by_order_by
 statement ok
@@ -1277,23 +1277,23 @@ EXPLAIN SELECT
 ----
 logical_plan
 Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
-  WindowAggr: windowExpr=[[COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
-    Projection: aggregate_test_100.c1, aggregate_test_100.c2, SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
-      WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
-        TableScan: aggregate_test_100 projection=[c1, c2, c4]
+--WindowAggr: windowExpr=[[COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+----Projection: aggregate_test_100.c1, aggregate_test_100.c2, SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
+------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+--------TableScan: aggregate_test_100 projection=[c1, c2, c4]
 physical_plan
 ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@2 as SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as COUNT(UInt8(1))]
-  BoundedWindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name: "COUNT(UInt8(1))", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], mode=[Sorted]
-    SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]
-      CoalesceBatchesExec: target_batch_size=4096
-        RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 2), input_partitions=2
-          ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4)]
-            BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: "SUM(aggregate_test_100.c4)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], mode=[Sorted]
-              SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]
-                CoalesceBatchesExec: target_batch_size=4096
-                  RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }, Column { name: "c2", index: 1 }], 2), input_partitions=2
-                    RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-                      CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c4], has_header=true
+--BoundedWindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name: "COUNT(UInt8(1))", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], mode=[Sorted]
+----SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]
+------CoalesceBatchesExec: target_batch_size=4096
+--------RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 2), input_partitions=2
+----------ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4)]
+------------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: "SUM(aggregate_test_100.c4)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], mode=[Sorted]
+--------------SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]
+----------------CoalesceBatchesExec: target_batch_size=4096
+------------------RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }, Column { name: "c2", index: 1 }], 2), input_partitions=2
+--------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c4], has_header=true
 
 
 # test_window_agg_sort_reversed_plan
@@ -1308,17 +1308,17 @@ EXPLAIN SELECT
 ----
 logical_plan
 Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2
-  Limit: skip=0, fetch=5
-    WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
-      WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
-        TableScan: aggregate_test_100 projection=[c9]
+--Limit: skip=0, fetch=5
+----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+--------TableScan: aggregate_test_100 projection=[c9]
 physical_plan
 ProjectionExec: expr=[c9@0 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum2]
-  GlobalLimitExec: skip=0, fetch=5
-    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }], mode=[Sorted]
-      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
-        SortExec: expr=[c9@0 DESC]
-          CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
+--GlobalLimitExec: skip=0, fetch=5
+----BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }], mode=[Sorted]
+------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
+--------SortExec: expr=[c9@0 DESC]
+----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
 
 query III
 SELECT
@@ -1349,17 +1349,17 @@ EXPLAIN SELECT
 ----
 logical_plan
 Projection: aggregate_test_100.c9, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS fv1, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS fv2, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS lag1, LAG(aggregate_test_100.c9,Int64(2), [...]
-  Limit: skip=0, fetch=5
-    WindowAggr: windowExpr=[[FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, LAG(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, LEAD(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-      WindowAggr: windowExpr=[[FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, LAG(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, LEAD(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING]]
-        TableScan: aggregate_test_100 projection=[c9]
+--Limit: skip=0, fetch=5
+----WindowAggr: windowExpr=[[FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, LAG(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, LEAD(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+------WindowAggr: windowExpr=[[FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, LAG(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, LEAD(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING]]
+--------TableScan: aggregate_test_100 projection=[c9]
 physical_plan
 ProjectionExec: expr=[c9@0 as c9, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@4 as fv1, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as fv2, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as lag1, LAG(aggregate_test_100.c9,Int6 [...]
-  GlobalLimitExec: skip=0, fetch=5
-    BoundedWindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: "FIRST_VALUE(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: "LAG(aggregate_test_100.c9,Int64(2),Int64(10101))", data_type: UInt64, nullable: true, dict_id: 0, dict_is_orde [...]
-      BoundedWindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: "FIRST_VALUE(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: "LAG(aggregate_test_100.c9,Int64(2),Int64(10101))", data_type: UInt64, nullable: true, dict_id: 0, dict_is_or [...]
-        SortExec: expr=[c9@0 DESC]
-          CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
+--GlobalLimitExec: skip=0, fetch=5
+----BoundedWindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: "FIRST_VALUE(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: "LAG(aggregate_test_100.c9,Int64(2),Int64(10101))", data_type: UInt64, nullable: true, dict_id: 0, dict_is_orde [...]
+------BoundedWindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: "FIRST_VALUE(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: "LAG(aggregate_test_100.c9,Int64(2),Int64(10101))", data_type: UInt64, nullable: true, dict_id: 0, dict_is_or [...]
+--------SortExec: expr=[c9@0 DESC]
+----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
 
 query IIIIIII
 SELECT
@@ -1392,18 +1392,18 @@ EXPLAIN SELECT
 ----
 logical_plan
 Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS rn1, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS rn2
-  Limit: skip=0, fetch=5
-    WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
-      WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
-        TableScan: aggregate_test_100 projection=[c9]
+--Limit: skip=0, fetch=5
+----WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+------WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+--------TableScan: aggregate_test_100 projection=[c9]
 physical_plan
 ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as rn1, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as rn2]
-  GlobalLimitExec: skip=0, fetch=5
-    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
-      SortExec: expr=[c9@0 ASC NULLS LAST]
-        BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
-          SortExec: expr=[c9@0 DESC]
-            CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
+--GlobalLimitExec: skip=0, fetch=5
+----BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
+------SortExec: expr=[c9@0 ASC NULLS LAST]
+--------BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
+----------SortExec: expr=[c9@0 DESC]
+------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
 
 
 query III
@@ -1433,20 +1433,20 @@ EXPLAIN SELECT
 ----
 logical_plan
 Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING A [...]
-  Limit: skip=0, fetch=5
-    WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
-      WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
-        WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
-          TableScan: aggregate_test_100 projection=[c1, c2, c9]
+--Limit: skip=0, fetch=5
+----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+------WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+----------TableScan: aggregate_test_100 projection=[c1, c2, c9]
 physical_plan
 ProjectionExec: expr=[c9@2 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@5 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDIN [...]
-  GlobalLimitExec: skip=0, fetch=5
-    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
-      SortExec: expr=[c9@2 ASC NULLS LAST,c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]
-        BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
-          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
-            SortExec: expr=[c9@2 DESC,c1@0 DESC]
-              CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c9], has_header=true
+--GlobalLimitExec: skip=0, fetch=5
+----BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
+------SortExec: expr=[c9@2 ASC NULLS LAST,c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]
+--------BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
+----------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
+------------SortExec: expr=[c9@2 DESC,c1@0 DESC]
+--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c9], has_header=true
 
 query IIII
 SELECT
@@ -1509,35 +1509,35 @@ EXPLAIN SELECT
 ----
 logical_plan
 Projection: SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING AS a, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING AS b, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING AS c, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING AS d, SUM(null_cases.c1) ORDER BY [null_cases.c [...]
-  Limit: skip=0, fetch=5
-    WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING]]
-      Projection: null_cases.c1, null_cases.c3, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST, null_cases.c1 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED [...]
-        WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST, null_cases.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-          WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST, null_cases.c1 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-            WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST, null_cases.c1 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-              WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING]]
-                WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING]]
-                  WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING]]
-                    WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST, null_cases.c1 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-                      TableScan: null_cases projection=[c1, c2, c3]
+--Limit: skip=0, fetch=5
+----WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING]]
+------Projection: null_cases.c1, null_cases.c3, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST, null_cases.c1 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED [...]
+--------WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST, null_cases.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+----------WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST, null_cases.c1 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+------------WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST, null_cases.c1 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+--------------WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING]]
+----------------WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING]]
+------------------WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING]]
+--------------------WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST, null_cases.c1 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+----------------------TableScan: null_cases projection=[c1, c2, c3]
 physical_plan
 ProjectionExec: expr=[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@18 as a, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@18 as b, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@3 as c, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@11 as d, SUM(null_cases.c1) O [...]
-  GlobalLimitExec: skip=0, fetch=5
-    WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), [...]
-      ProjectionExec: expr=[c1@0 as c1, c3@2 as c3, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST, null_cases.c1 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as SUM(null_cases.c1), SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@4 as SUM(null_cases.c1), SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as SUM(null_cases.c1), SUM(null [...]
-        BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
-          SortExec: expr=[c3@2 ASC NULLS LAST,c2@1 ASC NULLS LAST]
-            BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
-              SortExec: expr=[c3@2 ASC NULLS LAST,c1@0 ASC]
-                BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
-                  SortExec: expr=[c3@2 ASC NULLS LAST,c1@0 DESC]
-                    WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(11)), end_bound: Following(Int64(10)) }, SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Current [...]
-                      WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Prece [...]
-                        SortExec: expr=[c3@2 DESC NULLS LAST]
-                          WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: P [...]
-                            BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
-                              SortExec: expr=[c3@2 DESC,c1@0 ASC NULLS LAST]
-                                CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/null_cases.csv]]}, projection=[c1, c2, c3], has_header=true
+--GlobalLimitExec: skip=0, fetch=5
+----WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), [...]
+------ProjectionExec: expr=[c1@0 as c1, c3@2 as c3, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST, null_cases.c1 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as SUM(null_cases.c1), SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@4 as SUM(null_cases.c1), SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as SUM(null_cases.c1), SUM(null [...]
+--------BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+----------SortExec: expr=[c3@2 ASC NULLS LAST,c2@1 ASC NULLS LAST]
+------------BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+--------------SortExec: expr=[c3@2 ASC NULLS LAST,c1@0 ASC]
+----------------BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+------------------SortExec: expr=[c3@2 ASC NULLS LAST,c1@0 DESC]
+--------------------WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(11)), end_bound: Following(Int64(10)) }, SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Current [...]
+----------------------WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Prece [...]
+------------------------SortExec: expr=[c3@2 DESC NULLS LAST]
+--------------------------WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: P [...]
+----------------------------BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+------------------------------SortExec: expr=[c3@2 DESC,c1@0 ASC NULLS LAST]
+--------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/null_cases.csv]]}, projection=[c1, c2, c3], has_header=true
 
 query IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII
 SELECT
@@ -1602,17 +1602,17 @@ EXPLAIN SELECT
 ----
 logical_plan
 Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2
-  Limit: skip=0, fetch=5
-    WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
-      WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
-        TableScan: aggregate_test_100 projection=[c1, c9]
+--Limit: skip=0, fetch=5
+----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+--------TableScan: aggregate_test_100 projection=[c1, c9]
 physical_plan
 ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]
-  GlobalLimitExec: skip=0, fetch=5
-    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
-      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
-        SortExec: expr=[c1@0 ASC NULLS LAST,c9@1 DESC]
-          CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c9], has_header=true
+--GlobalLimitExec: skip=0, fetch=5
+----BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
+------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
+--------SortExec: expr=[c1@0 ASC NULLS LAST,c9@1 DESC]
+----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c9], has_header=true
 
 
 query III
@@ -1646,17 +1646,17 @@ EXPLAIN SELECT
 ----
 logical_plan
 Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2
-  Limit: skip=0, fetch=5
-    WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
-      WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
-        TableScan: aggregate_test_100 projection=[c1, c9]
+--Limit: skip=0, fetch=5
+----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+--------TableScan: aggregate_test_100 projection=[c1, c9]
 physical_plan
 ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum2]
-  GlobalLimitExec: skip=0, fetch=5
-    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }], mode=[Sorted]
-      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
-        SortExec: expr=[c1@0 ASC NULLS LAST,c9@1 DESC]
-          CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c9], has_header=true
+--GlobalLimitExec: skip=0, fetch=5
+----BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }], mode=[Sorted]
+------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
+--------SortExec: expr=[c1@0 ASC NULLS LAST,c9@1 DESC]
+----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c9], has_header=true
 
 query III
 SELECT
@@ -1690,19 +1690,19 @@ EXPLAIN SELECT c3,
 ----
 logical_plan
 Projection: aggregate_test_100.c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum2
-  Limit: skip=0, fetch=5
-    WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-      Projection: aggregate_test_100.c3, aggregate_test_100.c4, aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
-        WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-          TableScan: aggregate_test_100 projection=[c2, c3, c4, c9]
+--Limit: skip=0, fetch=5
+----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+------Projection: aggregate_test_100.c3, aggregate_test_100.c4, aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+----------TableScan: aggregate_test_100 projection=[c2, c3, c4, c9]
 physical_plan
 ProjectionExec: expr=[c3@0 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum2]
-  GlobalLimitExec: skip=0, fetch=5
-    WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int16(NULL)) }]
-      ProjectionExec: expr=[c3@1 as c3, c4@2 as c4, c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as SUM(aggregate_test_100.c9)]
-        BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int16(NULL)), end_bound: CurrentRow }], mode=[Sorted]
-          SortExec: expr=[c3@1 + c4@2 DESC,c9@3 DESC,c2@0 ASC NULLS LAST]
-            CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3, c4, c9], has_header=true
+--GlobalLimitExec: skip=0, fetch=5
+----WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int16(NULL)) }]
+------ProjectionExec: expr=[c3@1 as c3, c4@2 as c4, c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as SUM(aggregate_test_100.c9)]
+--------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int16(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+----------SortExec: expr=[c3@1 + c4@2 DESC,c9@3 DESC,c2@0 ASC NULLS LAST]
+------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3, c4, c9], has_header=true
 
 
 query III
@@ -1733,30 +1733,30 @@ EXPLAIN SELECT count(*) as global_count FROM
 ----
 logical_plan
 Projection: COUNT(UInt8(1)) AS global_count
-  Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]
-    SubqueryAlias: a
-      Sort: aggregate_test_100.c1 ASC NULLS LAST
-        Projection: aggregate_test_100.c1
-          Aggregate: groupBy=[[aggregate_test_100.c1]], aggr=[[COUNT(UInt8(1))]]
-            Projection: aggregate_test_100.c1
-              Filter: aggregate_test_100.c13 != Utf8("C2GT5KVyOPZpgKVl110TyZO0NcJ434")
-                TableScan: aggregate_test_100 projection=[c1, c13], partial_filters=[aggregate_test_100.c13 != Utf8("C2GT5KVyOPZpgKVl110TyZO0NcJ434")]
+--Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]
+----SubqueryAlias: a
+------Sort: aggregate_test_100.c1 ASC NULLS LAST
+--------Projection: aggregate_test_100.c1
+----------Aggregate: groupBy=[[aggregate_test_100.c1]], aggr=[[COUNT(UInt8(1))]]
+------------Projection: aggregate_test_100.c1
+--------------Filter: aggregate_test_100.c13 != Utf8("C2GT5KVyOPZpgKVl110TyZO0NcJ434")
+----------------TableScan: aggregate_test_100 projection=[c1, c13], partial_filters=[aggregate_test_100.c13 != Utf8("C2GT5KVyOPZpgKVl110TyZO0NcJ434")]
 physical_plan
 ProjectionExec: expr=[COUNT(UInt8(1))@0 as global_count]
-  AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]
-    CoalescePartitionsExec
-      AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]
-        RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=2
-          ProjectionExec: expr=[c1@0 as c1]
-            AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[COUNT(UInt8(1))]
-              CoalesceBatchesExec: target_batch_size=4096
-                RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 2), input_partitions=2
-                  AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[COUNT(UInt8(1))]
-                    ProjectionExec: expr=[c1@0 as c1]
-                      CoalesceBatchesExec: target_batch_size=4096
-                        FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434
-                          RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-                            CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c13], has_header=true
+--AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]
+----CoalescePartitionsExec
+------AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]
+--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=2
+----------ProjectionExec: expr=[c1@0 as c1]
+------------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[COUNT(UInt8(1))]
+--------------CoalesceBatchesExec: target_batch_size=4096
+----------------RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 2), input_partitions=2
+------------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[COUNT(UInt8(1))]
+--------------------ProjectionExec: expr=[c1@0 as c1]
+----------------------CoalesceBatchesExec: target_batch_size=4096
+------------------------FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434
+--------------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c13], has_header=true
 
 query I
 SELECT count(*) as global_count FROM
@@ -1784,25 +1784,25 @@ EXPLAIN SELECT c3,
 ----
 logical_plan
 Limit: skip=0, fetch=5
-  Sort: aggregate_test_100.c3 ASC NULLS LAST, fetch=5
-    Projection: aggregate_test_100.c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum2
-      WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-        Projection: aggregate_test_100.c3, aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
-          WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-            TableScan: aggregate_test_100 projection=[c2, c3, c9]
+--Sort: aggregate_test_100.c3 ASC NULLS LAST, fetch=5
+----Projection: aggregate_test_100.c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum2
+------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+--------Projection: aggregate_test_100.c3, aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+----------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+------------TableScan: aggregate_test_100 projection=[c2, c3, c9]
 physical_plan
 GlobalLimitExec: skip=0, fetch=5
-  SortPreservingMergeExec: [c3@0 ASC NULLS LAST]
-    ProjectionExec: expr=[c3@0 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum2]
-      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
-        SortExec: expr=[c3@0 ASC NULLS LAST,c9@1 DESC]
-          CoalesceBatchesExec: target_batch_size=4096
-            RepartitionExec: partitioning=Hash([Column { name: "c3", index: 0 }], 2), input_partitions=2
-              RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-                ProjectionExec: expr=[c3@1 as c3, c9@2 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as SUM(aggregate_test_100.c9)]
-                  BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int16(NULL)), end_bound: CurrentRow }], mode=[Sorted]
-                    SortExec: expr=[c3@1 DESC,c9@2 DESC,c2@0 ASC NULLS LAST]
-                      CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3, c9], has_header=true
+--SortPreservingMergeExec: [c3@0 ASC NULLS LAST]
+----ProjectionExec: expr=[c3@0 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum2]
+------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+--------SortExec: expr=[c3@0 ASC NULLS LAST,c9@1 DESC]
+----------CoalesceBatchesExec: target_batch_size=4096
+------------RepartitionExec: partitioning=Hash([Column { name: "c3", index: 0 }], 2), input_partitions=2
+--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----------------ProjectionExec: expr=[c3@1 as c3, c9@2 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as SUM(aggregate_test_100.c9)]
+------------------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int16(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+--------------------SortExec: expr=[c3@1 DESC,c9@2 DESC,c2@0 ASC NULLS LAST]
+----------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3, c9], has_header=true
 
 
 
@@ -1830,18 +1830,18 @@ EXPLAIN SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM aggregate_tes
 ----
 logical_plan
 Sort: aggregate_test_100.c1 ASC NULLS LAST
-  Projection: aggregate_test_100.c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS rn1
-    WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
-      TableScan: aggregate_test_100 projection=[c1]
+--Projection: aggregate_test_100.c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS rn1
+----WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
+------TableScan: aggregate_test_100 projection=[c1]
 physical_plan
 SortPreservingMergeExec: [c1@0 ASC NULLS LAST]
-  ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as rn1]
-    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }], mode=[Sorted]
-      SortExec: expr=[c1@0 ASC NULLS LAST]
-        CoalesceBatchesExec: target_batch_size=4096
-          RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 2), input_partitions=2
-            RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-              CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true
+--ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as rn1]
+----BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }], mode=[Sorted]
+------SortExec: expr=[c1@0 ASC NULLS LAST]
+--------CoalesceBatchesExec: target_batch_size=4096
+----------RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 2), input_partitions=2
+------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true
 
 query TI
 SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM aggregate_test_100 ORDER BY c1 ASC
@@ -1956,19 +1956,19 @@ EXPLAIN SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM aggregate_tes
 ----
 logical_plan
 Sort: aggregate_test_100.c1 ASC NULLS LAST
-  Projection: aggregate_test_100.c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS rn1
-    WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
-      TableScan: aggregate_test_100 projection=[c1]
+--Projection: aggregate_test_100.c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS rn1
+----WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
+------TableScan: aggregate_test_100 projection=[c1]
 physical_plan
 SortExec: expr=[c1@0 ASC NULLS LAST]
-  CoalescePartitionsExec
-    ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as rn1]
-      BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }], mode=[Sorted]
-        SortExec: expr=[c1@0 ASC NULLS LAST]
-          CoalesceBatchesExec: target_batch_size=4096
-            RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 2), input_partitions=2
-              RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-                CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true
+--CoalescePartitionsExec
+----ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as rn1]
+------BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }], mode=[Sorted]
+--------SortExec: expr=[c1@0 ASC NULLS LAST]
+----------CoalesceBatchesExec: target_batch_size=4096
+------------RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 2), input_partitions=2
+--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true
 
 statement ok
 set datafusion.optimizer.repartition_sorts = true;
@@ -1982,22 +1982,22 @@ EXPLAIN SELECT c1,
 ----
 logical_plan
 Sort: aggregate_test_100.c1 ASC NULLS LAST
-  Projection: aggregate_test_100.c1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING AS sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2
-    WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
-      WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING]]
-        TableScan: aggregate_test_100 projection=[c1, c9]
+--Projection: aggregate_test_100.c1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING AS sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2
+----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING]]
+--------TableScan: aggregate_test_100 projection=[c1, c9]
 physical_plan
 SortExec: expr=[c1@0 ASC NULLS LAST]
-  ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]
-    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
-      SortPreservingMergeExec: [c9@1 ASC NULLS LAST]
-        SortExec: expr=[c9@1 ASC NULLS LAST]
-          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) }], mode=[Sorted]
-            SortExec: expr=[c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]
-              CoalesceBatchesExec: target_batch_size=4096
-                RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 2), input_partitions=2
-                  RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-                    CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c9], has_header=true
+--ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]
+----BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
+------SortPreservingMergeExec: [c9@1 ASC NULLS LAST]
+--------SortExec: expr=[c9@1 ASC NULLS LAST]
+----------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) }], mode=[Sorted]
+------------SortExec: expr=[c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]
+--------------CoalesceBatchesExec: target_batch_size=4096
+----------------RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 2), input_partitions=2
+------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+--------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c9], has_header=true
 
 # test_window_agg_with_global_limit
 statement ok
@@ -2008,19 +2008,19 @@ EXPLAIN SELECT ARRAY_AGG(c13) as array_agg1 FROM (SELECT * FROM aggregate_test_1
 ----
 logical_plan
 Projection: ARRAYAGG(aggregate_test_100.c13) AS array_agg1
-  Aggregate: groupBy=[[]], aggr=[[ARRAYAGG(aggregate_test_100.c13)]]
-    Limit: skip=0, fetch=1
-      Sort: aggregate_test_100.c13 ASC NULLS LAST, fetch=1
-        TableScan: aggregate_test_100 projection=[c13]
+--Aggregate: groupBy=[[]], aggr=[[ARRAYAGG(aggregate_test_100.c13)]]
+----Limit: skip=0, fetch=1
+------Sort: aggregate_test_100.c13 ASC NULLS LAST, fetch=1
+--------TableScan: aggregate_test_100 projection=[c13]
 physical_plan
 ProjectionExec: expr=[ARRAYAGG(aggregate_test_100.c13)@0 as array_agg1]
-  AggregateExec: mode=Final, gby=[], aggr=[ARRAYAGG(aggregate_test_100.c13)]
-    CoalescePartitionsExec
-      AggregateExec: mode=Partial, gby=[], aggr=[ARRAYAGG(aggregate_test_100.c13)]
-        RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-          GlobalLimitExec: skip=0, fetch=1
-            SortExec: fetch=1, expr=[c13@0 ASC NULLS LAST]
-              CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c13], has_header=true
+--AggregateExec: mode=Final, gby=[], aggr=[ARRAYAGG(aggregate_test_100.c13)]
+----CoalescePartitionsExec
+------AggregateExec: mode=Partial, gby=[], aggr=[ARRAYAGG(aggregate_test_100.c13)]
+--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----------GlobalLimitExec: skip=0, fetch=1
+------------SortExec: fetch=1, expr=[c13@0 ASC NULLS LAST]
+--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c13], has_header=true
 
 
 query ?
@@ -2069,25 +2069,25 @@ EXPLAIN SELECT
 ----
 logical_plan
 Limit: skip=0, fetch=5
-  Sort: aggregate_test_100.c9 ASC NULLS LAST, fetch=5
-    Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100. [...]
-      WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
-        Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, SUM [...]
-          WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING]]
-            WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
-              WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING]]
-                TableScan: aggregate_test_100 projection=[c1, c2, c8, c9]
+--Sort: aggregate_test_100.c9 ASC NULLS LAST, fetch=5
+----Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100. [...]
+------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+--------Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, SUM [...]
+----------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING]]
+------------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+--------------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING]]
+----------------TableScan: aggregate_test_100 projection=[c1, c2, c8, c9]
 physical_plan
 GlobalLimitExec: skip=0, fetch=5
-  SortExec: fetch=5, expr=[c9@0 ASC NULLS LAST]
-    ProjectionExec: expr=[c9@2 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@4 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@6 as sum2, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_1 [...]
-      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
-        ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c9@3 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING@4 as SUM(aggregate_test_100.c9), SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLO [...]
-          WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
-            BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
-              WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
-                SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@3 ASC NULLS LAST,c8@2 ASC NULLS LAST]
-                  CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c8, c9], has_header=true
+--SortExec: fetch=5, expr=[c9@0 ASC NULLS LAST]
+----ProjectionExec: expr=[c9@2 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@4 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@6 as sum2, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_1 [...]
+------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
+--------ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c9@3 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING@4 as SUM(aggregate_test_100.c9), SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLO [...]
+----------WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
+------------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
+--------------WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
+----------------SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@3 ASC NULLS LAST,c8@2 ASC NULLS LAST]
+------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c8, c9], has_header=true
 
 
 
@@ -2123,30 +2123,30 @@ EXPLAIN SELECT c9,
 ----
 logical_plan
 Projection: t1.c9, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum1, SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING AS sum3, SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAS [...]
-  Limit: skip=0, fetch=5
-    WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
-      Projection: t1.c2, t1.c9, t1.c1_alias, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING
-        WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING]]
-          Projection: t1.c2, t1.c8, t1.c9, t1.c1_alias, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING
-            WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
-              WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING]]
-                SubqueryAlias: t1
-                  Sort: aggregate_test_100.c9 ASC NULLS LAST
-                    Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c8, aggregate_test_100.c9, aggregate_test_100.c1 AS c1_alias
-                      TableScan: aggregate_test_100 projection=[c1, c2, c8, c9]
+--Limit: skip=0, fetch=5
+----WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+------Projection: t1.c2, t1.c9, t1.c1_alias, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING
+--------WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING]]
+----------Projection: t1.c2, t1.c8, t1.c9, t1.c1_alias, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING
+------------WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+--------------WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING]]
+----------------SubqueryAlias: t1
+------------------Sort: aggregate_test_100.c9 ASC NULLS LAST
+--------------------Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c8, aggregate_test_100.c9, aggregate_test_100.c1 AS c1_alias
+----------------------TableScan: aggregate_test_100 projection=[c1, c2, c8, c9]
 physical_plan
 ProjectionExec: expr=[c9@1 as c9, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@4 as sum1, SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@6 as sum2, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING@3 as sum3, SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [...]
-  GlobalLimitExec: skip=0, fetch=5
-    BoundedWindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
-      ProjectionExec: expr=[c2@0 as c2, c9@2 as c9, c1_alias@3 as c1_alias, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING@4 as SUM(t1.c9), SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@5 as SUM(t1.c9), SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND  [...]
-        WindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
-          SortExec: expr=[c2@0 ASC NULLS LAST,c1_alias@3 ASC NULLS LAST,c9@2 ASC NULLS LAST,c8@1 ASC NULLS LAST]
-            ProjectionExec: expr=[c2@1 as c2, c8@2 as c8, c9@3 as c9, c1_alias@4 as c1_alias, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING@5 as SUM(t1.c9), SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@6 as SUM(t1.c9)]
-              BoundedWindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
-                WindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
-                  SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@3 ASC NULLS LAST,c8@2 ASC NULLS LAST]
-                    ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c8@2 as c8, c9@3 as c9, c1@0 as c1_alias]
-                      CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c8, c9], has_header=true
+--GlobalLimitExec: skip=0, fetch=5
+----BoundedWindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
+------ProjectionExec: expr=[c2@0 as c2, c9@2 as c9, c1_alias@3 as c1_alias, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING@4 as SUM(t1.c9), SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@5 as SUM(t1.c9), SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND  [...]
+--------WindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
+----------SortExec: expr=[c2@0 ASC NULLS LAST,c1_alias@3 ASC NULLS LAST,c9@2 ASC NULLS LAST,c8@1 ASC NULLS LAST]
+------------ProjectionExec: expr=[c2@1 as c2, c8@2 as c8, c9@3 as c9, c1_alias@4 as c1_alias, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING@5 as SUM(t1.c9), SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@6 as SUM(t1.c9)]
+--------------BoundedWindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
+----------------WindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
+------------------SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@3 ASC NULLS LAST,c8@2 ASC NULLS LAST]
+--------------------ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c8@2 as c8, c9@3 as c9, c1@0 as c1_alias]
+----------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c8, c9], has_header=true
 
 
 
@@ -2176,23 +2176,23 @@ EXPLAIN SELECT SUM(c12) OVER(ORDER BY c1, c2 GROUPS BETWEEN 1 PRECEDING AND 1 FO
 ----
 logical_plan
 Projection: sum1, sum2
-  Limit: skip=0, fetch=5
-    Sort: aggregate_test_100.c9 ASC NULLS LAST, fetch=5
-      Projection: SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS sum1, SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST] GROUPS BETWEEN 5 PRECEDING AND 3 PRECEDING AS sum2, aggregate_test_100.c9
-        WindowAggr: windowExpr=[[SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST] GROUPS BETWEEN 5 PRECEDING AND 3 PRECEDING]]
-          Projection: aggregate_test_100.c1, aggregate_test_100.c9, aggregate_test_100.c12, SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING
-            WindowAggr: windowExpr=[[SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
-              TableScan: aggregate_test_100 projection=[c1, c2, c9, c12]
+--Limit: skip=0, fetch=5
+----Sort: aggregate_test_100.c9 ASC NULLS LAST, fetch=5
+------Projection: SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS sum1, SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST] GROUPS BETWEEN 5 PRECEDING AND 3 PRECEDING AS sum2, aggregate_test_100.c9
+--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST] GROUPS BETWEEN 5 PRECEDING AND 3 PRECEDING]]
+----------Projection: aggregate_test_100.c1, aggregate_test_100.c9, aggregate_test_100.c12, SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING
+------------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+--------------TableScan: aggregate_test_100 projection=[c1, c2, c9, c12]
 physical_plan
 ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2]
-  GlobalLimitExec: skip=0, fetch=5
-    SortExec: fetch=5, expr=[c9@2 ASC NULLS LAST]
-      ProjectionExec: expr=[SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as sum1, SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST] GROUPS BETWEEN 5 PRECEDING AND 3 PRECEDING@4 as sum2, c9@1 as c9]
-        BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c12): Ok(Field { name: "SUM(aggregate_test_100.c12)", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Groups, start_bound: Preceding(UInt64(5)), end_bound: Preceding(UInt64(3)) }], mode=[Sorted]
-          ProjectionExec: expr=[c1@0 as c1, c9@2 as c9, c12@3 as c12, SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as SUM(aggregate_test_100.c12)]
-            BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c12): Ok(Field { name: "SUM(aggregate_test_100.c12)", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Groups, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], mode=[Sorted]
-              SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]
-                CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c9, c12], has_header=true
+--GlobalLimitExec: skip=0, fetch=5
+----SortExec: fetch=5, expr=[c9@2 ASC NULLS LAST]
+------ProjectionExec: expr=[SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as sum1, SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST] GROUPS BETWEEN 5 PRECEDING AND 3 PRECEDING@4 as sum2, c9@1 as c9]
+--------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c12): Ok(Field { name: "SUM(aggregate_test_100.c12)", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Groups, start_bound: Preceding(UInt64(5)), end_bound: Preceding(UInt64(3)) }], mode=[Sorted]
+----------ProjectionExec: expr=[c1@0 as c1, c9@2 as c9, c12@3 as c12, SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as SUM(aggregate_test_100.c12)]
+------------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c12): Ok(Field { name: "SUM(aggregate_test_100.c12)", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Groups, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], mode=[Sorted]
+--------------SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]
+----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c9, c12], has_header=true
 
 query RR
 SELECT SUM(c12) OVER(ORDER BY c1, c2 GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as sum1,
@@ -2218,17 +2218,17 @@ EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
 ----
 logical_plan
 Limit: skip=0, fetch=5
-  Sort: rn1 ASC NULLS LAST, fetch=5
-    Sort: aggregate_test_100.c9 ASC NULLS LAST
-      Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
-        WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-          TableScan: aggregate_test_100 projection=[c9]
+--Sort: rn1 ASC NULLS LAST, fetch=5
+----Sort: aggregate_test_100.c9 ASC NULLS LAST
+------Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
+--------WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+----------TableScan: aggregate_test_100 projection=[c9]
 physical_plan
 GlobalLimitExec: skip=0, fetch=5
-  ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]
-    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
-      SortExec: expr=[c9@0 ASC NULLS LAST]
-        CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
+--ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]
+----BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+------SortExec: expr=[c9@0 ASC NULLS LAST]
+--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
 
 query II
 SELECT c9, rn1 FROM (SELECT c9,
@@ -2257,17 +2257,17 @@ EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
 ----
 logical_plan
 Limit: skip=0, fetch=5
-  Sort: rn1 ASC NULLS LAST, fetch=5
-    Sort: aggregate_test_100.c9 DESC NULLS FIRST
-      Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
-        WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-          TableScan: aggregate_test_100 projection=[c9]
+--Sort: rn1 ASC NULLS LAST, fetch=5
+----Sort: aggregate_test_100.c9 DESC NULLS FIRST
+------Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
+--------WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+----------TableScan: aggregate_test_100 projection=[c9]
 physical_plan
 GlobalLimitExec: skip=0, fetch=5
-  ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]
-    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
-      SortExec: expr=[c9@0 DESC]
-        CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
+--ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]
+----BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+------SortExec: expr=[c9@0 DESC]
+--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
 
 query II
 SELECT c9, rn1 FROM (SELECT c9,
@@ -2296,18 +2296,18 @@ EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
 ----
 logical_plan
 Limit: skip=0, fetch=5
-  Sort: rn1 DESC NULLS FIRST, fetch=5
-    Sort: aggregate_test_100.c9 DESC NULLS FIRST
-      Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
-        WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-          TableScan: aggregate_test_100 projection=[c9]
+--Sort: rn1 DESC NULLS FIRST, fetch=5
+----Sort: aggregate_test_100.c9 DESC NULLS FIRST
+------Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
+--------WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+----------TableScan: aggregate_test_100 projection=[c9]
 physical_plan
 GlobalLimitExec: skip=0, fetch=5
-  SortExec: fetch=5, expr=[rn1@1 DESC]
-    ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]
-      BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
-        SortExec: expr=[c9@0 DESC]
-          CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
+--SortExec: fetch=5, expr=[rn1@1 DESC]
+----ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]
+------BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+--------SortExec: expr=[c9@0 DESC]
+----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
 
 query II
 SELECT c9, rn1 FROM (SELECT c9,
@@ -2339,18 +2339,18 @@ EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
 ----
 logical_plan
 Limit: skip=0, fetch=5
-  Sort: rn1 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST, fetch=5
-    Sort: aggregate_test_100.c9 DESC NULLS FIRST
-      Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
-        WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-          TableScan: aggregate_test_100 projection=[c9]
+--Sort: rn1 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST, fetch=5
+----Sort: aggregate_test_100.c9 DESC NULLS FIRST
+------Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
+--------WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+----------TableScan: aggregate_test_100 projection=[c9]
 physical_plan
 GlobalLimitExec: skip=0, fetch=5
-  SortExec: fetch=5, expr=[rn1@1 ASC NULLS LAST,c9@0 ASC NULLS LAST]
-    ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]
-      BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
-        SortExec: expr=[c9@0 DESC]
-          CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
+--SortExec: fetch=5, expr=[rn1@1 ASC NULLS LAST,c9@0 ASC NULLS LAST]
+----ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]
+------BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+--------SortExec: expr=[c9@0 DESC]
+----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
 
 query II
 SELECT c9, rn1 FROM (SELECT c9,
@@ -2368,15 +2368,15 @@ SELECT c9, rn1 FROM (SELECT c9,
 
 
 # invalid window frame. null as preceding
-statement error DataFusion error: Error during planning: Invalid window frame: frame offsets must be non negative integers 
+statement error DataFusion error: Error during planning: Invalid window frame: frame offsets must be non negative integers
 select row_number() over (rows between null preceding and current row) from (select 1 a) x
 
 # invalid window frame. null as preceding
-statement error DataFusion error: Error during planning: Invalid window frame: frame offsets must be non negative integers 
+statement error DataFusion error: Error during planning: Invalid window frame: frame offsets must be non negative integers
 select row_number() over (rows between null preceding and current row) from (select 1 a) x
 
 # invalid window frame. negative as following
-statement error DataFusion error: Error during planning: Invalid window frame: frame offsets must be non negative integers 
+statement error DataFusion error: Error during planning: Invalid window frame: frame offsets must be non negative integers
 select row_number() over (rows between current row and -1 following) from (select 1 a) x
 
 # This test shows that ordering satisfy considers ordering equivalences,
@@ -2393,17 +2393,17 @@ EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
 ----
 logical_plan
 Limit: skip=0, fetch=5
-  Sort: rn1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST, fetch=5
-    Sort: aggregate_test_100.c9 DESC NULLS FIRST
-      Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
-        WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-          TableScan: aggregate_test_100 projection=[c9]
+--Sort: rn1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST, fetch=5
+----Sort: aggregate_test_100.c9 DESC NULLS FIRST
+------Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
+--------WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+----------TableScan: aggregate_test_100 projection=[c9]
 physical_plan
 GlobalLimitExec: skip=0, fetch=5
-  ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]
-    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
-      SortExec: expr=[c9@0 DESC]
-        CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
+--ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]
+----BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+------SortExec: expr=[c9@0 DESC]
+--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
 
 # The following query has type error. We should test the error could be detected
 # from either the logical plan (when `skip_failed_rules` is set to `false`) or
@@ -2496,24 +2496,24 @@ EXPLAIN SELECT
 ----
 logical_plan
 Projection: sum1, sum2, sum3, min1, min2, min3, max1, max2, max3, cnt1, cnt2, sumr1, sumr2, sumr3, minr1, minr2, minr3, maxr1, maxr2, maxr3, cntr1, cntr2, sum4, cnt3
-  Limit: skip=0, fetch=5
-    Sort: annotated_data_finite.inc_col DESC NULLS FIRST, fetch=5
-      Projection: SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS sum1, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING AS sum2, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING AS sum3, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_f [...]
-        WindowAggr: windowExpr=[[SUM(annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]]
-          Projection: annotated_data_finite.inc_col, annotated_data_finite.desc_col, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, MIN(annotated [...]
-  WindowAggr: windowExpr=[[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NU [...]
-    WindowAggr: windowExpr=[[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts [...]
-      TableScan: annotated_data_finite projection=[ts, inc_col, desc_col]
+--Limit: skip=0, fetch=5
+----Sort: annotated_data_finite.inc_col DESC NULLS FIRST, fetch=5
+------Projection: SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS sum1, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING AS sum2, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING AS sum3, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_f [...]
+--------WindowAggr: windowExpr=[[SUM(annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]]
+----------Projection: annotated_data_finite.inc_col, annotated_data_finite.desc_col, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, MIN(annotated [...]
+------------WindowAggr: windowExpr=[[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite [...]
+--------------WindowAggr: windowExpr=[[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data [...]
+----------------TableScan: annotated_data_finite projection=[ts, inc_col, desc_col]
 physical_plan
 ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, sum3@2 as sum3, min1@3 as min1, min2@4 as min2, min3@5 as min3, max1@6 as max1, max2@7 as max2, max3@8 as max3, cnt1@9 as cnt1, cnt2@10 as cnt2, sumr1@11 as sumr1, sumr2@12 as sumr2, sumr3@13 as sumr3, minr1@14 as minr1, minr2@15 as minr2, minr3@16 as minr3, maxr1@17 as maxr1, maxr2@18 as maxr2, maxr3@19 as maxr3, cntr1@20 as cntr1, cntr2@21 as cntr2, sum4@22 as sum4, cnt3@23 as cnt3]
-  GlobalLimitExec: skip=0, fetch=5
-    SortExec: fetch=5, expr=[inc_col@24 DESC]
-      ProjectionExec: expr=[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@13 as sum1, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@14 as sum2, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING@15 as sum3, MIN(annotated_data_finite.inc_col) ORDER B [...]
-        BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.desc_col): Ok(Field { name: "SUM(annotated_data_finite.desc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(8)), end_bound: Following(UInt64(1)) }, COUNT(UInt8(1)): Ok(Field { name: "COUNT(UInt8(1))", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, [...]
-          ProjectionExec: expr=[inc_col@1 as inc_col, desc_col@2 as desc_col, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING@3 as SUM(annotated_data_finite.inc_col), SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING@4 as SUM(annotated_data_finite.desc_col), SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts [...]
-  BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col): Ok(Field { name: "SUM(annotated_data_finite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, SUM(annotated_data_finite.desc_col): Ok(Field { name: "SUM(annotated_data_finite.desc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), f [...]
-    BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col): Ok(Field { name: "SUM(annotated_data_finite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(4)), end_bound: Following(Int32(1)) }, SUM(annotated_data_finite.desc_col): Ok(Field { name: "SUM(annotated_data_finite.desc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }),  [...]
-      CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col, desc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
+--GlobalLimitExec: skip=0, fetch=5
+----SortExec: fetch=5, expr=[inc_col@24 DESC]
+------ProjectionExec: expr=[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@13 as sum1, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@14 as sum2, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING@15 as sum3, MIN(annotated_data_finite.inc_col) ORDER B [...]
+--------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.desc_col): Ok(Field { name: "SUM(annotated_data_finite.desc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(8)), end_bound: Following(UInt64(1)) }, COUNT(UInt8(1)): Ok(Field { name: "COUNT(UInt8(1))", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, [...]
+----------ProjectionExec: expr=[inc_col@1 as inc_col, desc_col@2 as desc_col, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING@3 as SUM(annotated_data_finite.inc_col), SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING@4 as SUM(annotated_data_finite.desc_col), SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts [...]
+------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col): Ok(Field { name: "SUM(annotated_data_finite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, SUM(annotated_data_finite.desc_col): Ok(Field { name: "SUM(annotated_data_finite.desc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata [...]
+--------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col): Ok(Field { name: "SUM(annotated_data_finite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(4)), end_bound: Following(Int32(1)) }, SUM(annotated_data_finite.desc_col): Ok(Field { name: "SUM(annotated_data_finite.desc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadat [...]
+----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col, desc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
 
 query IIIIIIIIIIIIIIIIIIIIIIII
 SELECT
@@ -2586,20 +2586,20 @@ EXPLAIN SELECT
 ----
 logical_plan
 Projection: fv1, fv2, lv1, lv2, nv1, nv2, rn1, rn2, rank1, rank2, dense_rank1, dense_rank2, lag1, lag2, lead1, lead2, fvr1, fvr2, lvr1, lvr2, lagr1, lagr2, leadr1, leadr2
-  Limit: skip=0, fetch=5
-    Sort: annotated_data_finite.ts DESC NULLS FIRST, fetch=5
-      Projection: FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv1, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv2, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lv1, LAST_VALUE(annotated_data_finite.inc_col) [...]
-        WindowAggr: windowExpr=[[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, LAST_VALUE(annotated_data_finite.inc_col) ORDER [...]
-          WindowAggr: windowExpr=[[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, LAST_VALUE(annotated_data_finite.inc_co [...]
-  TableScan: annotated_data_finite projection=[ts, inc_col]
+--Limit: skip=0, fetch=5
+----Sort: annotated_data_finite.ts DESC NULLS FIRST, fetch=5
+------Projection: FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv1, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv2, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lv1, LAST_VALUE(annotated_data_finite.inc_col) [...]
+--------WindowAggr: windowExpr=[[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, LAST_VALUE(annotated_data_finite.inc_col) ORDER [...]
+----------WindowAggr: windowExpr=[[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, LAST_VALUE(annotated_data_finite.inc_co [...]
+------------TableScan: annotated_data_finite projection=[ts, inc_col]
 physical_plan
 ProjectionExec: expr=[fv1@0 as fv1, fv2@1 as fv2, lv1@2 as lv1, lv2@3 as lv2, nv1@4 as nv1, nv2@5 as nv2, rn1@6 as rn1, rn2@7 as rn2, rank1@8 as rank1, rank2@9 as rank2, dense_rank1@10 as dense_rank1, dense_rank2@11 as dense_rank2, lag1@12 as lag1, lag2@13 as lag2, lead1@14 as lead1, lead2@15 as lead2, fvr1@16 as fvr1, fvr2@17 as fvr2, lvr1@18 as lvr1, lvr2@19 as lvr2, lagr1@20 as lagr1, lagr2@21 as lagr2, leadr1@22 as leadr1, leadr2@23 as leadr2]
-  GlobalLimitExec: skip=0, fetch=5
-    SortExec: fetch=5, expr=[ts@24 DESC]
-      ProjectionExec: expr=[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@10 as fv1, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@11 as fv2, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@12 as lv1, LAST_VALUE(annotated_d [...]
-        BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, FIRST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_i [...]
-          BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) }, FIRST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict [...]
-  CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
+--GlobalLimitExec: skip=0, fetch=5
+----SortExec: fetch=5, expr=[ts@24 DESC]
+------ProjectionExec: expr=[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@10 as fv1, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@11 as fv2, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@12 as lv1, LAST_VALUE(annotated_d [...]
+--------BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, FIRST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_i [...]
+----------BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) }, FIRST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict [...]
+------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
 
 query IIIIIIIIIIIIIIIIIIIIIIII
 SELECT
@@ -2658,20 +2658,20 @@ EXPLAIN SELECT
 ----
 logical_plan
 Projection: sum1, sum2, min1, min2, max1, max2, count1, count2, avg1, avg2
-  Limit: skip=0, fetch=5
-    Sort: annotated_data_finite.inc_col ASC NULLS LAST, fetch=5
-      Projection: SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING AS sum1, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS sum2, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING AS min1, MIN(annotated_data_finite.inc_col) OR [...]
-        WindowAggr: windowExpr=[[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, COUNT(annotated_data_finite.inc_col) ORDER BY [a [...]
-          WindowAggr: windowExpr=[[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, COUNT(annotated_data_finite.inc_col) ORD [...]
-  TableScan: annotated_data_finite projection=[ts, inc_col]
+--Limit: skip=0, fetch=5
+----Sort: annotated_data_finite.inc_col ASC NULLS LAST, fetch=5
+------Projection: SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING AS sum1, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS sum2, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING AS min1, MIN(annotated_data_finite.inc_col) OR [...]
+--------WindowAggr: windowExpr=[[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, COUNT(annotated_data_finite.inc_col) ORDER BY [a [...]
+----------WindowAggr: windowExpr=[[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, COUNT(annotated_data_finite.inc_col) ORD [...]
+------------TableScan: annotated_data_finite projection=[ts, inc_col]
 physical_plan
 ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, min1@2 as min1, min2@3 as min2, max1@4 as max1, max2@5 as max2, count1@6 as count1, count2@7 as count2, avg1@8 as avg1, avg2@9 as avg2]
-  GlobalLimitExec: skip=0, fetch=5
-    SortExec: fetch=5, expr=[inc_col@10 ASC NULLS LAST]
-      ProjectionExec: expr=[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING@7 as sum1, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as sum2, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING@8 as min1, MIN(annotated_data_fi [...]
-        BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col): Ok(Field { name: "SUM(annotated_data_finite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(5)) }, MIN(annotated_data_finite.inc_col): Ok(Field { name: "MIN(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} [...]
-          BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col): Ok(Field { name: "SUM(annotated_data_finite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(3)) }, MIN(annotated_data_finite.inc_col): Ok(Field { name: "MIN(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:  [...]
-  CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
+--GlobalLimitExec: skip=0, fetch=5
+----SortExec: fetch=5, expr=[inc_col@10 ASC NULLS LAST]
+------ProjectionExec: expr=[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING@7 as sum1, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as sum2, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING@8 as min1, MIN(annotated_data_fi [...]
+--------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col): Ok(Field { name: "SUM(annotated_data_finite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(5)) }, MIN(annotated_data_finite.inc_col): Ok(Field { name: "MIN(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} [...]
+----------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col): Ok(Field { name: "SUM(annotated_data_finite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(3)) }, MIN(annotated_data_finite.inc_col): Ok(Field { name: "MIN(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:  [...]
+------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
 
 query IIIIIIIIRR
 SELECT
@@ -2711,20 +2711,20 @@ EXPLAIN SELECT
 ----
 logical_plan
 Projection: first_value1, first_value2, last_value1, last_value2, nth_value1
-  Limit: skip=0, fetch=5
-    Sort: annotated_data_finite.inc_col ASC NULLS LAST, fetch=5
-      Projection: FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS first_value1, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS first_value2, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS last_va [...]
-        WindowAggr: windowExpr=[[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, NTH_VALUE(annotated_data_finite.inc_col, Int64(2)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]]
-          WindowAggr: windowExpr=[[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING]]
-  TableScan: annotated_data_finite projection=[ts, inc_col]
+--Limit: skip=0, fetch=5
+----Sort: annotated_data_finite.inc_col ASC NULLS LAST, fetch=5
+------Projection: FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS first_value1, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS first_value2, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS last_va [...]
+--------WindowAggr: windowExpr=[[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, NTH_VALUE(annotated_data_finite.inc_col, Int64(2)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]]
+----------WindowAggr: windowExpr=[[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING]]
+------------TableScan: annotated_data_finite projection=[ts, inc_col]
 physical_plan
 ProjectionExec: expr=[first_value1@0 as first_value1, first_value2@1 as first_value2, last_value1@2 as last_value1, last_value2@3 as last_value2, nth_value1@4 as nth_value1]
-  GlobalLimitExec: skip=0, fetch=5
-    SortExec: fetch=5, expr=[inc_col@5 ASC NULLS LAST]
-      ProjectionExec: expr=[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@4 as first_value1, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as first_value2, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOW [...]
-        BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(1)) }, LAST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "LAST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_ [...]
-          BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(3)) }, LAST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "LAST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dic [...]
-  CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
+--GlobalLimitExec: skip=0, fetch=5
+----SortExec: fetch=5, expr=[inc_col@5 ASC NULLS LAST]
+------ProjectionExec: expr=[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@4 as first_value1, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as first_value2, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOW [...]
+--------BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(1)) }, LAST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "LAST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_ [...]
+----------BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(3)) }, LAST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "LAST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dic [...]
+------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
 
 query IIIII
 SELECT
@@ -2757,19 +2757,19 @@ EXPLAIN SELECT
 ----
 logical_plan
 Projection: sum1, sum2, count1, count2
-  Limit: skip=0, fetch=5
-    Sort: annotated_data_infinite.ts ASC NULLS LAST, fetch=5
-      Projection: SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS sum1, SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS count1, COUNT(annotated_data_inf [...]
-        WindowAggr: windowExpr=[[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]]
-          WindowAggr: windowExpr=[[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING]]
-  TableScan: annotated_data_infinite projection=[ts, inc_col]
+--Limit: skip=0, fetch=5
+----Sort: annotated_data_infinite.ts ASC NULLS LAST, fetch=5
+------Projection: SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS sum1, SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS count1, COUNT(annotated_data_inf [...]
+--------WindowAggr: windowExpr=[[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]]
+----------WindowAggr: windowExpr=[[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING]]
+------------TableScan: annotated_data_infinite projection=[ts, inc_col]
 physical_plan
 ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, count2@3 as count2]
-  GlobalLimitExec: skip=0, fetch=5
-    ProjectionExec: expr=[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@4 as sum1, SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@5 as count1, COUNT(anno [...]
-      BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col): Ok(Field { name: "SUM(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(1)) }, COUNT(annotated_data_infinite.inc_col): Ok(Field { name: "COUNT(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, m [...]
-        BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col): Ok(Field { name: "SUM(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(3)) }, COUNT(annotated_data_infinite.inc_col): Ok(Field { name: "COUNT(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, [...]
-          CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST], has_header=true
+--GlobalLimitExec: skip=0, fetch=5
+----ProjectionExec: expr=[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@4 as sum1, SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@5 as count1, COUNT(anno [...]
+------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col): Ok(Field { name: "SUM(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(1)) }, COUNT(annotated_data_infinite.inc_col): Ok(Field { name: "COUNT(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, m [...]
+--------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col): Ok(Field { name: "SUM(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(3)) }, COUNT(annotated_data_infinite.inc_col): Ok(Field { name: "COUNT(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, [...]
+----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST], has_header=true
 
 
 query IIII
@@ -2803,19 +2803,19 @@ EXPLAIN SELECT
 ----
 logical_plan
 Projection: sum1, sum2, count1, count2
-  Limit: skip=0, fetch=5
-    Sort: annotated_data_infinite.ts ASC NULLS LAST, fetch=5
-      Projection: SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS sum1, SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS count1, COUNT(annotated_data_inf [...]
-        WindowAggr: windowExpr=[[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]]
-          WindowAggr: windowExpr=[[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING]]
-  TableScan: annotated_data_infinite projection=[ts, inc_col]
+--Limit: skip=0, fetch=5
+----Sort: annotated_data_infinite.ts ASC NULLS LAST, fetch=5
+------Projection: SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS sum1, SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS count1, COUNT(annotated_data_inf [...]
+--------WindowAggr: windowExpr=[[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]]
+----------WindowAggr: windowExpr=[[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING]]
+------------TableScan: annotated_data_infinite projection=[ts, inc_col]
 physical_plan
 ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, count2@3 as count2]
-  GlobalLimitExec: skip=0, fetch=5
-    ProjectionExec: expr=[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@4 as sum1, SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@5 as count1, COUNT(anno [...]
-      BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col): Ok(Field { name: "SUM(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(1)) }, COUNT(annotated_data_infinite.inc_col): Ok(Field { name: "COUNT(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, m [...]
-        BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col): Ok(Field { name: "SUM(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(3)) }, COUNT(annotated_data_infinite.inc_col): Ok(Field { name: "COUNT(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, [...]
-          CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST], has_header=true
+--GlobalLimitExec: skip=0, fetch=5
+----ProjectionExec: expr=[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@4 as sum1, SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@5 as count1, COUNT(anno [...]
+------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col): Ok(Field { name: "SUM(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(1)) }, COUNT(annotated_data_infinite.inc_col): Ok(Field { name: "COUNT(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, m [...]
+--------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col): Ok(Field { name: "SUM(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(3)) }, COUNT(annotated_data_infinite.inc_col): Ok(Field { name: "COUNT(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, [...]
+----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST], has_header=true
 
 
 query IIII
@@ -2901,24 +2901,24 @@ EXPLAIN SELECT a, b, c,
 ----
 logical_plan
 Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum1, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS [...]
-  Limit: skip=0, fetch=5
-    WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS L [...]
-      WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING]]
-        WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
-          WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 1 FOLL [...]
-  WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW]]
-    WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
-      TableScan: annotated_data_infinite2 projection=[a, b, c, d]
+--Limit: skip=0, fetch=5
+----WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS L [...]
+------WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING]]
+--------WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
+----------WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 1 FOLL [...]
+------------WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW]]
+--------------WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
+----------------TableScan: annotated_data_infinite2 projection=[a, b, c, d]
 physical_plan
 ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@8 as sum1, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULL [...]
-  GlobalLimitExec: skip=0, fetch=5
-    BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: Window [...]
-      BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: Wind [...]
-        BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: Wi [...]
-          BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame:  [...]
-  BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFr [...]
-    BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: Window [...]
-      CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
+--GlobalLimitExec: skip=0, fetch=5
+----BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: Window [...]
+------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: Wind [...]
+--------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: Wi [...]
+----------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame:  [...]
+------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame [...]
+--------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), fra [...]
+----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
 
 
 query IIIIIIIIIIIIIII
@@ -2968,31 +2968,31 @@ EXPLAIN SELECT a, b, c,
 ----
 logical_plan
 Limit: skip=0, fetch=5
-  Sort: annotated_data_finite2.c ASC NULLS LAST, fetch=5
-    Projection: annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.c, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum1, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_dat [...]
-      WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN  [...]
-        WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING]]
-          WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
-  WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING]]
-    WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW]]
-      WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
-        TableScan: annotated_data_finite2 projection=[a, b, c, d]
+--Sort: annotated_data_finite2.c ASC NULLS LAST, fetch=5
+----Projection: annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.c, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum1, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_dat [...]
+------WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN  [...]
+--------WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING]]
+----------WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
+------------WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING]]
+--------------WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW]]
+----------------WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
+------------------TableScan: annotated_data_finite2 projection=[a, b, c, d]
 physical_plan
 GlobalLimitExec: skip=0, fetch=5
-  SortExec: fetch=5, expr=[c@2 ASC NULLS LAST]
-    ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@8 as sum1, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BET [...]
-      BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame  [...]
-        SortExec: expr=[d@3 ASC NULLS LAST,a@0 ASC NULLS LAST,b@1 ASC NULLS LAST,c@2 ASC NULLS LAST]
-          BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFr [...]
-  SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST,d@3 ASC NULLS LAST,c@2 ASC NULLS LAST]
-    BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {  [...]
-      SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST,c@2 ASC NULLS LAST]
-        BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFram [...]
-          SortExec: expr=[a@0 ASC NULLS LAST,d@3 ASC NULLS LAST,b@1 ASC NULLS LAST,c@2 ASC NULLS LAST]
-  BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { un [...]
-    SortExec: expr=[a@0 ASC NULLS LAST,b@1 ASC NULLS LAST,d@3 ASC NULLS LAST,c@2 ASC NULLS LAST]
-      BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame  [...]
-        CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
+--SortExec: fetch=5, expr=[c@2 ASC NULLS LAST]
+----ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@8 as sum1, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BET [...]
+------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame  [...]
+--------SortExec: expr=[d@3 ASC NULLS LAST,a@0 ASC NULLS LAST,b@1 ASC NULLS LAST,c@2 ASC NULLS LAST]
+----------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFr [...]
+------------SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST,d@3 ASC NULLS LAST,c@2 ASC NULLS LAST]
+--------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: Wind [...]
+----------------SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST,c@2 ASC NULLS LAST]
+------------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame:  [...]
+--------------------SortExec: expr=[a@0 ASC NULLS LAST,d@3 ASC NULLS LAST,b@1 ASC NULLS LAST,c@2 ASC NULLS LAST]
+----------------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), fra [...]
+------------------------SortExec: expr=[a@0 ASC NULLS LAST,b@1 ASC NULLS LAST,d@3 ASC NULLS LAST,c@2 ASC NULLS LAST]
+--------------------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), [...]
+----------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
 
 
 query IIIIIIIIIIIIIII