You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/12/29 12:39:15 UTC

[arrow-datafusion] branch master updated: Append generated column to the schema instead of prepending for WindowAggExec (#4746)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new ad0459e93 Append generated column to the schema instead of prepending for WindowAggExec (#4746)
ad0459e93 is described below

commit ad0459e934c62b516459aac99d966f49ffdb6530
Author: Mustafa akur <10...@users.noreply.github.com>
AuthorDate: Thu Dec 29 15:39:08 2022 +0300

    Append generated column to the schema instead of prepending for WindowAggExec (#4746)
    
    * Append window result instead of prepending
    
    * Fix expected projections in tests
    
    Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
 .../core/src/physical_optimizer/optimize_sorts.rs  |   2 +-
 datafusion/core/src/physical_plan/windows/mod.rs   |  12 ++-
 .../src/physical_plan/windows/window_agg_exec.rs   | 109 ++++-----------------
 datafusion/core/tests/sql/window.rs                |  44 ++++-----
 datafusion/expr/src/logical_plan/builder.rs        |   4 +-
 5 files changed, 49 insertions(+), 122 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/optimize_sorts.rs b/datafusion/core/src/physical_optimizer/optimize_sorts.rs
index 0a3be1d5b..ed827c14e 100644
--- a/datafusion/core/src/physical_optimizer/optimize_sorts.rs
+++ b/datafusion/core/src/physical_optimizer/optimize_sorts.rs
@@ -677,7 +677,7 @@ mod tests {
             vec![
                 "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]",
                 "  FilterExec: NOT non_nullable_col@1",
-                "    SortExec: [non_nullable_col@2 ASC NULLS LAST]",
+                "    SortExec: [non_nullable_col@1 ASC NULLS LAST]",
                 "      WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]",
                 "        SortExec: [non_nullable_col@1 DESC]",
                 "          MemoryExec: partitions=0, partition_sizes=[]",
diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs
index ffe9a82d0..4a261c0f3 100644
--- a/datafusion/core/src/physical_plan/windows/mod.rs
+++ b/datafusion/core/src/physical_plan/windows/mod.rs
@@ -265,7 +265,7 @@ mod tests {
                 schema.as_ref(),
             )?],
             input,
-            schema,
+            schema.clone(),
             vec![],
             None,
         )?);
@@ -273,9 +273,10 @@ mod tests {
         let result: Vec<RecordBatch> = collect(window_exec, task_ctx).await?;
         assert_eq!(result.len(), 1);
 
+        let n_schema_fields = schema.fields().len();
         let columns = result[0].columns();
 
-        let count: &Int64Array = as_primitive_array(&columns[0])?;
+        let count: &Int64Array = as_primitive_array(&columns[n_schema_fields])?;
         assert_eq!(count.value(0), 100);
         assert_eq!(count.value(99), 100);
         Ok(())
@@ -326,19 +327,20 @@ mod tests {
         let result: Vec<RecordBatch> = collect(window_exec, task_ctx).await?;
         assert_eq!(result.len(), 1);
 
+        let n_schema_fields = schema.fields().len();
         let columns = result[0].columns();
 
         // c3 is small int
 
-        let count: &Int64Array = as_primitive_array(&columns[0])?;
+        let count: &Int64Array = as_primitive_array(&columns[n_schema_fields])?;
         assert_eq!(count.value(0), 100);
         assert_eq!(count.value(99), 100);
 
-        let max: &Int8Array = as_primitive_array(&columns[1])?;
+        let max: &Int8Array = as_primitive_array(&columns[n_schema_fields + 1])?;
         assert_eq!(max.value(0), 125);
         assert_eq!(max.value(99), 125);
 
-        let min: &Int8Array = as_primitive_array(&columns[2])?;
+        let min: &Int8Array = as_primitive_array(&columns[n_schema_fields + 2])?;
         assert_eq!(min.value(0), -117);
         assert_eq!(min.value(99), -117);
 
diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index 23ec2d179..bd413ad8e 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -25,7 +25,7 @@ use crate::physical_plan::metrics::{
     BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
 };
 use crate::physical_plan::{
-    Column, ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties,
+    ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties,
     ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
     SendableRecordBatchStream, Statistics, WindowExpr,
 };
@@ -39,8 +39,6 @@ use arrow::{
     record_batch::RecordBatch,
 };
 use datafusion_common::DataFusionError;
-use datafusion_physical_expr::rewrite::TreeNodeRewritable;
-use datafusion_physical_expr::EquivalentClass;
 use futures::stream::Stream;
 use futures::{ready, StreamExt};
 use log::debug;
@@ -65,8 +63,6 @@ pub struct WindowAggExec {
     pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
     /// Sort Keys
     pub sort_keys: Option<Vec<PhysicalSortExpr>>,
-    /// The output ordering
-    output_ordering: Option<Vec<PhysicalSortExpr>>,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
 }
@@ -82,33 +78,6 @@ impl WindowAggExec {
     ) -> Result<Self> {
         let schema = create_schema(&input_schema, &window_expr)?;
         let schema = Arc::new(schema);
-        let window_expr_len = window_expr.len();
-        // Although WindowAggExec does not change the output ordering from the input, but can not return the output ordering
-        // from the input directly, need to adjust the column index to align with the new schema.
-        let output_ordering = input
-            .output_ordering()
-            .map(|sort_exprs| {
-                let new_sort_exprs: Result<Vec<PhysicalSortExpr>> = sort_exprs
-                    .iter()
-                    .map(|e| {
-                        let new_expr = e.expr.clone().transform_down(&|e| {
-                            Ok(e.as_any().downcast_ref::<Column>().map(|col| {
-                                Arc::new(Column::new(
-                                    col.name(),
-                                    window_expr_len + col.index(),
-                                ))
-                                    as Arc<dyn PhysicalExpr>
-                            }))
-                        })?;
-                        Ok(PhysicalSortExpr {
-                            expr: new_expr,
-                            options: e.options,
-                        })
-                    })
-                    .collect();
-                new_sort_exprs
-            })
-            .map_or(Ok(None), |v| v.map(Some))?;
 
         Ok(Self {
             input,
@@ -117,7 +86,6 @@ impl WindowAggExec {
             input_schema,
             partition_keys,
             sort_keys,
-            output_ordering,
             metrics: ExecutionPlanMetricsSet::new(),
         })
     }
@@ -176,34 +144,10 @@ impl ExecutionPlan for WindowAggExec {
 
     /// Get the output partitioning of this plan
     fn output_partitioning(&self) -> Partitioning {
-        // Although WindowAggExec does not change the output partitioning from the input, but can not return the output partitioning
-        // from the input directly, need to adjust the column index to align with the new schema.
-        let window_expr_len = self.window_expr.len();
-        let input_partitioning = self.input.output_partitioning();
-        match input_partitioning {
-            Partitioning::RoundRobinBatch(size) => Partitioning::RoundRobinBatch(size),
-            Partitioning::UnknownPartitioning(size) => {
-                Partitioning::UnknownPartitioning(size)
-            }
-            Partitioning::Hash(exprs, size) => {
-                let new_exprs = exprs
-                    .into_iter()
-                    .map(|expr| {
-                        expr.transform_down(&|e| {
-                            Ok(e.as_any().downcast_ref::<Column>().map(|col| {
-                                Arc::new(Column::new(
-                                    col.name(),
-                                    window_expr_len + col.index(),
-                                ))
-                                    as Arc<dyn PhysicalExpr>
-                            }))
-                        })
-                        .unwrap()
-                    })
-                    .collect::<Vec<_>>();
-                Partitioning::Hash(new_exprs, size)
-            }
-        }
+        // because we can have repartitioning using the partition keys
+        // this would be either 1 or more than 1 depending on the presense of
+        // repartitioning
+        self.input.output_partitioning()
     }
 
     /// Specifies whether this plan generates an infinite stream of records.
@@ -221,7 +165,7 @@ impl ExecutionPlan for WindowAggExec {
     }
 
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        self.output_ordering.as_deref()
+        self.input().output_ordering()
     }
 
     fn maintains_input_order(&self) -> bool {
@@ -244,30 +188,7 @@ impl ExecutionPlan for WindowAggExec {
     }
 
     fn equivalence_properties(&self) -> EquivalenceProperties {
-        // Although WindowAggExec does not change the equivalence properties from the input, but can not return the equivalence properties
-        // from the input directly, need to adjust the column index to align with the new schema.
-        let window_expr_len = self.window_expr.len();
-        let mut new_properties = EquivalenceProperties::new(self.schema());
-        let new_eq_classes = self
-            .input
-            .equivalence_properties()
-            .classes()
-            .iter()
-            .map(|prop| {
-                let new_head = Column::new(
-                    prop.head().name(),
-                    window_expr_len + prop.head().index(),
-                );
-                let new_others = prop
-                    .others()
-                    .iter()
-                    .map(|col| Column::new(col.name(), window_expr_len + col.index()))
-                    .collect::<Vec<_>>();
-                EquivalentClass::new(new_head, new_others)
-            })
-            .collect::<Vec<_>>();
-        new_properties.extend(new_eq_classes);
-        new_properties
+        self.input().equivalence_properties()
     }
 
     fn with_new_children(
@@ -334,12 +255,13 @@ impl ExecutionPlan for WindowAggExec {
         let win_cols = self.window_expr.len();
         let input_cols = self.input_schema.fields().len();
         // TODO stats: some windowing function will maintain invariants such as min, max...
-        let mut column_statistics = vec![ColumnStatistics::default(); win_cols];
+        let mut column_statistics = Vec::with_capacity(win_cols + input_cols);
         if let Some(input_col_stats) = input_stat.column_statistics {
             column_statistics.extend(input_col_stats);
         } else {
             column_statistics.extend(vec![ColumnStatistics::default(); input_cols]);
         }
+        column_statistics.extend(vec![ColumnStatistics::default(); win_cols]);
         Statistics {
             is_exact: input_stat.is_exact,
             num_rows: input_stat.num_rows,
@@ -354,10 +276,11 @@ fn create_schema(
     window_expr: &[Arc<dyn WindowExpr>],
 ) -> Result<Schema> {
     let mut fields = Vec::with_capacity(input_schema.fields().len() + window_expr.len());
+    fields.extend_from_slice(input_schema.fields());
+    // append results to the schema
     for expr in window_expr {
         fields.push(expr.field()?);
     }
-    fields.extend_from_slice(input_schema.fields());
     Ok(Schema::new(fields))
 }
 
@@ -433,7 +356,7 @@ impl WindowAggStream {
                 .map_err(|e| ArrowError::ExternalError(Box::new(e)))?,
             )
         }
-        let mut columns = transpose(partition_results)
+        let columns = transpose(partition_results)
             .iter()
             .map(|elems| concat(&elems.iter().map(|x| x.as_ref()).collect::<Vec<_>>()))
             .collect::<Vec<_>>()
@@ -442,9 +365,11 @@ impl WindowAggStream {
 
         // combine with the original cols
         // note the setup of window aggregates is that they newly calculated window
-        // expressions are always prepended to the columns
-        columns.extend_from_slice(batch.columns());
-        RecordBatch::try_new(self.schema.clone(), columns)
+        // expression results are always appended to the columns
+        let mut batch_columns = batch.columns().to_vec();
+        // calculate window cols
+        batch_columns.extend_from_slice(&columns);
+        RecordBatch::try_new(self.schema.clone(), batch_columns)
     }
 
     /// Evaluates the partition points given the sort columns. If the sort columns are
diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs
index 32438b610..2f997ca28 100644
--- a/datafusion/core/tests/sql/window.rs
+++ b/datafusion/core/tests/sql/window.rs
@@ -398,7 +398,7 @@ async fn window_expr_eliminate() -> Result<()> {
         "        SubqueryAlias: d [seq:UInt64;N, a:Int64, b:Utf8]",
         "          SubqueryAlias: _data2 [seq:UInt64;N, a:Int64, b:Utf8]",
         "            Projection: ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS seq, s.a, s.b [seq:UInt64;N, a:Int64, b:Utf8]",
-        "              WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N, a:Int64, b:Utf8]",
+        "              WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [a:Int64, b:Utf8, ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N]",
         "                SubqueryAlias: s [a:Int64, b:Utf8]",
         "                  SubqueryAlias: _sample_data [a:Int64, b:Utf8]",
         "                    Union [a:Int64, b:Utf8]",
@@ -1651,7 +1651,7 @@ async fn test_window_agg_sort() -> Result<()> {
     // Only 1 SortExec was added
     let expected = {
         vec![
-            "ProjectionExec: expr=[c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@0 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as sum2]",
+            "ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as sum2]",
             "  RepartitionExec: partitioning=RoundRobinBatch(2)",
             "    WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
             "      WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
@@ -1684,12 +1684,12 @@ async fn over_order_by_sort_keys_sorting_prefix_compacting() -> Result<()> {
     // Only 1 SortExec was added
     let expected = {
         vec![
-            "ProjectionExec: expr=[c2@3 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@0 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as MI [...]
+            "ProjectionExec: expr=[c2@0 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@4 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as MI [...]
             "  RepartitionExec: partitioning=RoundRobinBatch(2)",
             "    WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
             "      WindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
             "        WindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
-            "          SortExec: [c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]"
+            "          SortExec: [c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
         ]
     };
 
@@ -1720,11 +1720,11 @@ async fn over_order_by_sort_keys_sorting_global_order_compacting() -> Result<()>
         vec![
             "SortExec: [c2@0 ASC NULLS LAST]",
             "  CoalescePartitionsExec",
-            "    ProjectionExec: expr=[c2@3 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@0 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN U [...]
+            "    ProjectionExec: expr=[c2@0 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@4 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN U [...]
             "      RepartitionExec: partitioning=RoundRobinBatch(2)",
             "        WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
             "          WindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
-            "            SortExec: [c9@2 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
+            "            SortExec: [c9@1 ASC NULLS LAST,c2@0 ASC NULLS LAST]",
             "              WindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
             "                SortExec: [c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
         ]
@@ -1757,11 +1757,11 @@ async fn test_window_partition_by_order_by() -> Result<()> {
     let formatted = displayable(physical_plan.as_ref()).indent().to_string();
     let expected = {
         vec![
-            "ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as COUNT(UInt8(1))]",
+            "ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as COUNT(UInt8(1))]",
             "  WindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]",
-            "    SortExec: [c1@1 ASC NULLS LAST,c2@2 ASC NULLS LAST]",
+            "    SortExec: [c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
             "      CoalesceBatchesExec: target_batch_size=4096",
-            "        RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 1 }], 2)",
+            "        RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2)",
             "          WindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]",
             "            SortExec: [c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
             "              CoalesceBatchesExec: target_batch_size=4096",
@@ -1799,7 +1799,7 @@ async fn test_window_agg_sort_reversed_plan() -> Result<()> {
     // Only 1 SortExec was added
     let expected = {
         vec![
-            "ProjectionExec: expr=[c9@2 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum2]",
+            "ProjectionExec: expr=[c9@0 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum2]",
             "  RepartitionExec: partitioning=RoundRobinBatch(2)",
             "    GlobalLimitExec: skip=0, fetch=5",
             "      WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]",
@@ -1856,7 +1856,7 @@ async fn test_window_agg_sort_reversed_plan_builtin() -> Result<()> {
     // Only 1 SortExec was added
     let expected = {
         vec![
-            "ProjectionExec: expr=[c9@6 as c9, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as fv1, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as fv2, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as lag1, LAG(aggregate_tes [...]
+            "ProjectionExec: expr=[c9@0 as c9, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@4 as fv1, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as fv2, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as lag1, LAG(aggregate_tes [...]
             "  RepartitionExec: partitioning=RoundRobinBatch(2)",
             "    GlobalLimitExec: skip=0, fetch=5",
             "      WindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: \"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_id: 0,  [...]
@@ -1909,11 +1909,11 @@ async fn test_window_agg_sort_non_reversed_plan() -> Result<()> {
     // We cannot reverse each window function (ROW_NUMBER is not reversible)
     let expected = {
         vec![
-            "ProjectionExec: expr=[c9@2 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as rn1, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as rn2]",
+            "ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as rn1, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as rn2]",
             "  RepartitionExec: partitioning=RoundRobinBatch(2)",
             "    GlobalLimitExec: skip=0, fetch=5",
             "      WindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
-            "        SortExec: [c9@1 ASC NULLS LAST]",
+            "        SortExec: [c9@0 ASC NULLS LAST]",
             "          WindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
             "            SortExec: [c9@0 DESC]",
         ]
@@ -1964,11 +1964,11 @@ async fn test_window_agg_sort_multi_layer_non_reversed_plan() -> Result<()> {
     // We cannot reverse each window function (ROW_NUMBER is not reversible)
     let expected = {
         vec![
-            "ProjectionExec: expr=[c9@5 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum2, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWE [...]
+            "ProjectionExec: expr=[c9@2 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@5 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWE [...]
             "  RepartitionExec: partitioning=RoundRobinBatch(2)",
             "    GlobalLimitExec: skip=0, fetch=5",
             "      WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
-            "        SortExec: [c9@4 ASC NULLS LAST,c1@2 ASC NULLS LAST,c2@3 ASC NULLS LAST]",
+            "        SortExec: [c9@2 ASC NULLS LAST,c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
             "          WindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
             "            WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
             "              SortExec: [c9@2 DESC,c1@0 DESC]",
@@ -2053,14 +2053,14 @@ async fn test_window_agg_complex_plan() -> Result<()> {
     // Unnecessary SortExecs are removed
     let expected = {
         vec![
-            "ProjectionExec: expr=[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@0 as a, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@0 as b, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@15 as c, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@7 as d, SUM(null_ [...]
+            "ProjectionExec: expr=[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@19 as a, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@19 as b, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@4 as c, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@12 as d, SUM(nul [...]
             "  RepartitionExec: partitioning=RoundRobinBatch(2)",
             "    GlobalLimitExec: skip=0, fetch=5",
             "      WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Prec [...]
             "        WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]",
-            "          SortExec: [c3@17 ASC NULLS LAST,c2@16 ASC NULLS LAST]",
+            "          SortExec: [c3@2 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
             "            WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]",
-            "              SortExec: [c3@16 ASC NULLS LAST,c1@14 ASC]",
+            "              SortExec: [c3@2 ASC NULLS LAST,c1@0 ASC]",
             "                WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }]",
             "                  WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(11)), end_bound: Following(Int64(10)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start [...]
             "                    WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, sta [...]
@@ -2103,7 +2103,7 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_plan() -> Result<()>
     // Only 1 SortExec was added
     let expected = {
         vec![
-            "ProjectionExec: expr=[c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as sum2]",
+            "ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]",
             "  RepartitionExec: partitioning=RoundRobinBatch(2)",
             "    GlobalLimitExec: skip=0, fetch=5",
             "      WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
@@ -2159,7 +2159,7 @@ async fn test_window_agg_sort_partitionby_reversed_plan() -> Result<()> {
     // Only 1 SortExec was added
     let expected = {
         vec![
-            "ProjectionExec: expr=[c9@3 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum2]",
+            "ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum2]",
             "  RepartitionExec: partitioning=RoundRobinBatch(2)",
             "    GlobalLimitExec: skip=0, fetch=5",
             "      WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]",
@@ -2214,7 +2214,7 @@ async fn test_window_agg_sort_orderby_reversed_binary_expr() -> Result<()> {
     // Only 1 SortExec was added
     let expected = {
         vec![
-            "ProjectionExec: expr=[c3@3 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@0 as sum2]",
+            "ProjectionExec: expr=[c3@1 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as sum2]",
             "  RepartitionExec: partitioning=RoundRobinBatch(2)",
             "    GlobalLimitExec: skip=0, fetch=5",
             "      WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: \"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_o [...]
@@ -2328,7 +2328,7 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Re
     // Only 1 SortExec was added
     let expected = {
         vec![
-            "ProjectionExec: expr=[c3@3 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@0 as sum2]",
+            "ProjectionExec: expr=[c3@1 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum2]",
             "  RepartitionExec: partitioning=RoundRobinBatch(2)",
             "    GlobalLimitExec: skip=0, fetch=5",
             "      WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs
index 6bb3cf80e..50325cfde 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -681,8 +681,8 @@ impl LogicalPlanBuilder {
         let window_expr = normalize_cols(window_expr, &self.plan)?;
         let all_expr = window_expr.iter();
         validate_unique_names("Windows", all_expr.clone())?;
-        let mut window_fields: Vec<DFField> = exprlist_to_fields(all_expr, &self.plan)?;
-        window_fields.extend_from_slice(self.plan.schema().fields());
+        let mut window_fields: Vec<DFField> = self.plan.schema().fields().clone();
+        window_fields.extend_from_slice(&exprlist_to_fields(all_expr, &self.plan)?);
         let metadata = self.plan.schema().metadata().clone();
 
         Ok(Self::from(LogicalPlan::Window(Window {