You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/12/29 12:39:15 UTC
[arrow-datafusion] branch master updated: Append generated column to the schema instead of prepending for WindowAggExec (#4746)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new ad0459e93 Append generated column to the schema instead of prepending for WindowAggExec (#4746)
ad0459e93 is described below
commit ad0459e934c62b516459aac99d966f49ffdb6530
Author: Mustafa akur <10...@users.noreply.github.com>
AuthorDate: Thu Dec 29 15:39:08 2022 +0300
Append generated column to the schema instead of prepending for WindowAggExec (#4746)
* Append window result instead of prepending
* Fix expected projections in tests
Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
.../core/src/physical_optimizer/optimize_sorts.rs | 2 +-
datafusion/core/src/physical_plan/windows/mod.rs | 12 ++-
.../src/physical_plan/windows/window_agg_exec.rs | 109 ++++-----------------
datafusion/core/tests/sql/window.rs | 44 ++++-----
datafusion/expr/src/logical_plan/builder.rs | 4 +-
5 files changed, 49 insertions(+), 122 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/optimize_sorts.rs b/datafusion/core/src/physical_optimizer/optimize_sorts.rs
index 0a3be1d5b..ed827c14e 100644
--- a/datafusion/core/src/physical_optimizer/optimize_sorts.rs
+++ b/datafusion/core/src/physical_optimizer/optimize_sorts.rs
@@ -677,7 +677,7 @@ mod tests {
vec![
"WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]",
" FilterExec: NOT non_nullable_col@1",
- " SortExec: [non_nullable_col@2 ASC NULLS LAST]",
+ " SortExec: [non_nullable_col@1 ASC NULLS LAST]",
" WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]",
" SortExec: [non_nullable_col@1 DESC]",
" MemoryExec: partitions=0, partition_sizes=[]",
diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs
index ffe9a82d0..4a261c0f3 100644
--- a/datafusion/core/src/physical_plan/windows/mod.rs
+++ b/datafusion/core/src/physical_plan/windows/mod.rs
@@ -265,7 +265,7 @@ mod tests {
schema.as_ref(),
)?],
input,
- schema,
+ schema.clone(),
vec![],
None,
)?);
@@ -273,9 +273,10 @@ mod tests {
let result: Vec<RecordBatch> = collect(window_exec, task_ctx).await?;
assert_eq!(result.len(), 1);
+ let n_schema_fields = schema.fields().len();
let columns = result[0].columns();
- let count: &Int64Array = as_primitive_array(&columns[0])?;
+ let count: &Int64Array = as_primitive_array(&columns[n_schema_fields])?;
assert_eq!(count.value(0), 100);
assert_eq!(count.value(99), 100);
Ok(())
@@ -326,19 +327,20 @@ mod tests {
let result: Vec<RecordBatch> = collect(window_exec, task_ctx).await?;
assert_eq!(result.len(), 1);
+ let n_schema_fields = schema.fields().len();
let columns = result[0].columns();
// c3 is small int
- let count: &Int64Array = as_primitive_array(&columns[0])?;
+ let count: &Int64Array = as_primitive_array(&columns[n_schema_fields])?;
assert_eq!(count.value(0), 100);
assert_eq!(count.value(99), 100);
- let max: &Int8Array = as_primitive_array(&columns[1])?;
+ let max: &Int8Array = as_primitive_array(&columns[n_schema_fields + 1])?;
assert_eq!(max.value(0), 125);
assert_eq!(max.value(99), 125);
- let min: &Int8Array = as_primitive_array(&columns[2])?;
+ let min: &Int8Array = as_primitive_array(&columns[n_schema_fields + 2])?;
assert_eq!(min.value(0), -117);
assert_eq!(min.value(99), -117);
diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index 23ec2d179..bd413ad8e 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -25,7 +25,7 @@ use crate::physical_plan::metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
};
use crate::physical_plan::{
- Column, ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties,
+ ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties,
ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
SendableRecordBatchStream, Statistics, WindowExpr,
};
@@ -39,8 +39,6 @@ use arrow::{
record_batch::RecordBatch,
};
use datafusion_common::DataFusionError;
-use datafusion_physical_expr::rewrite::TreeNodeRewritable;
-use datafusion_physical_expr::EquivalentClass;
use futures::stream::Stream;
use futures::{ready, StreamExt};
use log::debug;
@@ -65,8 +63,6 @@ pub struct WindowAggExec {
pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
/// Sort Keys
pub sort_keys: Option<Vec<PhysicalSortExpr>>,
- /// The output ordering
- output_ordering: Option<Vec<PhysicalSortExpr>>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}
@@ -82,33 +78,6 @@ impl WindowAggExec {
) -> Result<Self> {
let schema = create_schema(&input_schema, &window_expr)?;
let schema = Arc::new(schema);
- let window_expr_len = window_expr.len();
- // Although WindowAggExec does not change the output ordering from the input, but can not return the output ordering
- // from the input directly, need to adjust the column index to align with the new schema.
- let output_ordering = input
- .output_ordering()
- .map(|sort_exprs| {
- let new_sort_exprs: Result<Vec<PhysicalSortExpr>> = sort_exprs
- .iter()
- .map(|e| {
- let new_expr = e.expr.clone().transform_down(&|e| {
- Ok(e.as_any().downcast_ref::<Column>().map(|col| {
- Arc::new(Column::new(
- col.name(),
- window_expr_len + col.index(),
- ))
- as Arc<dyn PhysicalExpr>
- }))
- })?;
- Ok(PhysicalSortExpr {
- expr: new_expr,
- options: e.options,
- })
- })
- .collect();
- new_sort_exprs
- })
- .map_or(Ok(None), |v| v.map(Some))?;
Ok(Self {
input,
@@ -117,7 +86,6 @@ impl WindowAggExec {
input_schema,
partition_keys,
sort_keys,
- output_ordering,
metrics: ExecutionPlanMetricsSet::new(),
})
}
@@ -176,34 +144,10 @@ impl ExecutionPlan for WindowAggExec {
/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
- // Although WindowAggExec does not change the output partitioning from the input, but can not return the output partitioning
- // from the input directly, need to adjust the column index to align with the new schema.
- let window_expr_len = self.window_expr.len();
- let input_partitioning = self.input.output_partitioning();
- match input_partitioning {
- Partitioning::RoundRobinBatch(size) => Partitioning::RoundRobinBatch(size),
- Partitioning::UnknownPartitioning(size) => {
- Partitioning::UnknownPartitioning(size)
- }
- Partitioning::Hash(exprs, size) => {
- let new_exprs = exprs
- .into_iter()
- .map(|expr| {
- expr.transform_down(&|e| {
- Ok(e.as_any().downcast_ref::<Column>().map(|col| {
- Arc::new(Column::new(
- col.name(),
- window_expr_len + col.index(),
- ))
- as Arc<dyn PhysicalExpr>
- }))
- })
- .unwrap()
- })
- .collect::<Vec<_>>();
- Partitioning::Hash(new_exprs, size)
- }
- }
+ // because we can have repartitioning using the partition keys
+ // this would be either 1 or more than 1 depending on the presense of
+ // repartitioning
+ self.input.output_partitioning()
}
/// Specifies whether this plan generates an infinite stream of records.
@@ -221,7 +165,7 @@ impl ExecutionPlan for WindowAggExec {
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- self.output_ordering.as_deref()
+ self.input().output_ordering()
}
fn maintains_input_order(&self) -> bool {
@@ -244,30 +188,7 @@ impl ExecutionPlan for WindowAggExec {
}
fn equivalence_properties(&self) -> EquivalenceProperties {
- // Although WindowAggExec does not change the equivalence properties from the input, but can not return the equivalence properties
- // from the input directly, need to adjust the column index to align with the new schema.
- let window_expr_len = self.window_expr.len();
- let mut new_properties = EquivalenceProperties::new(self.schema());
- let new_eq_classes = self
- .input
- .equivalence_properties()
- .classes()
- .iter()
- .map(|prop| {
- let new_head = Column::new(
- prop.head().name(),
- window_expr_len + prop.head().index(),
- );
- let new_others = prop
- .others()
- .iter()
- .map(|col| Column::new(col.name(), window_expr_len + col.index()))
- .collect::<Vec<_>>();
- EquivalentClass::new(new_head, new_others)
- })
- .collect::<Vec<_>>();
- new_properties.extend(new_eq_classes);
- new_properties
+ self.input().equivalence_properties()
}
fn with_new_children(
@@ -334,12 +255,13 @@ impl ExecutionPlan for WindowAggExec {
let win_cols = self.window_expr.len();
let input_cols = self.input_schema.fields().len();
// TODO stats: some windowing function will maintain invariants such as min, max...
- let mut column_statistics = vec![ColumnStatistics::default(); win_cols];
+ let mut column_statistics = Vec::with_capacity(win_cols + input_cols);
if let Some(input_col_stats) = input_stat.column_statistics {
column_statistics.extend(input_col_stats);
} else {
column_statistics.extend(vec![ColumnStatistics::default(); input_cols]);
}
+ column_statistics.extend(vec![ColumnStatistics::default(); win_cols]);
Statistics {
is_exact: input_stat.is_exact,
num_rows: input_stat.num_rows,
@@ -354,10 +276,11 @@ fn create_schema(
window_expr: &[Arc<dyn WindowExpr>],
) -> Result<Schema> {
let mut fields = Vec::with_capacity(input_schema.fields().len() + window_expr.len());
+ fields.extend_from_slice(input_schema.fields());
+ // append results to the schema
for expr in window_expr {
fields.push(expr.field()?);
}
- fields.extend_from_slice(input_schema.fields());
Ok(Schema::new(fields))
}
@@ -433,7 +356,7 @@ impl WindowAggStream {
.map_err(|e| ArrowError::ExternalError(Box::new(e)))?,
)
}
- let mut columns = transpose(partition_results)
+ let columns = transpose(partition_results)
.iter()
.map(|elems| concat(&elems.iter().map(|x| x.as_ref()).collect::<Vec<_>>()))
.collect::<Vec<_>>()
@@ -442,9 +365,11 @@ impl WindowAggStream {
// combine with the original cols
// note the setup of window aggregates is that they newly calculated window
- // expressions are always prepended to the columns
- columns.extend_from_slice(batch.columns());
- RecordBatch::try_new(self.schema.clone(), columns)
+ // expression results are always appended to the columns
+ let mut batch_columns = batch.columns().to_vec();
+ // calculate window cols
+ batch_columns.extend_from_slice(&columns);
+ RecordBatch::try_new(self.schema.clone(), batch_columns)
}
/// Evaluates the partition points given the sort columns. If the sort columns are
diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs
index 32438b610..2f997ca28 100644
--- a/datafusion/core/tests/sql/window.rs
+++ b/datafusion/core/tests/sql/window.rs
@@ -398,7 +398,7 @@ async fn window_expr_eliminate() -> Result<()> {
" SubqueryAlias: d [seq:UInt64;N, a:Int64, b:Utf8]",
" SubqueryAlias: _data2 [seq:UInt64;N, a:Int64, b:Utf8]",
" Projection: ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS seq, s.a, s.b [seq:UInt64;N, a:Int64, b:Utf8]",
- " WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N, a:Int64, b:Utf8]",
+ " WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [a:Int64, b:Utf8, ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64;N]",
" SubqueryAlias: s [a:Int64, b:Utf8]",
" SubqueryAlias: _sample_data [a:Int64, b:Utf8]",
" Union [a:Int64, b:Utf8]",
@@ -1651,7 +1651,7 @@ async fn test_window_agg_sort() -> Result<()> {
// Only 1 SortExec was added
let expected = {
vec![
- "ProjectionExec: expr=[c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@0 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as sum2]",
+ "ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as sum2]",
" RepartitionExec: partitioning=RoundRobinBatch(2)",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
@@ -1684,12 +1684,12 @@ async fn over_order_by_sort_keys_sorting_prefix_compacting() -> Result<()> {
// Only 1 SortExec was added
let expected = {
vec![
- "ProjectionExec: expr=[c2@3 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@0 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as MI [...]
+ "ProjectionExec: expr=[c2@0 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@4 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as MI [...]
" RepartitionExec: partitioning=RoundRobinBatch(2)",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
" WindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
" WindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
- " SortExec: [c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]"
+ " SortExec: [c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
]
};
@@ -1720,11 +1720,11 @@ async fn over_order_by_sort_keys_sorting_global_order_compacting() -> Result<()>
vec![
"SortExec: [c2@0 ASC NULLS LAST]",
" CoalescePartitionsExec",
- " ProjectionExec: expr=[c2@3 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@0 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN U [...]
+ " ProjectionExec: expr=[c2@0 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@4 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN U [...]
" RepartitionExec: partitioning=RoundRobinBatch(2)",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
" WindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
- " SortExec: [c9@2 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
+ " SortExec: [c9@1 ASC NULLS LAST,c2@0 ASC NULLS LAST]",
" WindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
" SortExec: [c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
]
@@ -1757,11 +1757,11 @@ async fn test_window_partition_by_order_by() -> Result<()> {
let formatted = displayable(physical_plan.as_ref()).indent().to_string();
let expected = {
vec![
- "ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as COUNT(UInt8(1))]",
+ "ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as COUNT(UInt8(1))]",
" WindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]",
- " SortExec: [c1@1 ASC NULLS LAST,c2@2 ASC NULLS LAST]",
+ " SortExec: [c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 1 }], 2)",
+ " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2)",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]",
" SortExec: [c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=4096",
@@ -1799,7 +1799,7 @@ async fn test_window_agg_sort_reversed_plan() -> Result<()> {
// Only 1 SortExec was added
let expected = {
vec![
- "ProjectionExec: expr=[c9@2 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum2]",
+ "ProjectionExec: expr=[c9@0 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum2]",
" RepartitionExec: partitioning=RoundRobinBatch(2)",
" GlobalLimitExec: skip=0, fetch=5",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]",
@@ -1856,7 +1856,7 @@ async fn test_window_agg_sort_reversed_plan_builtin() -> Result<()> {
// Only 1 SortExec was added
let expected = {
vec![
- "ProjectionExec: expr=[c9@6 as c9, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as fv1, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as fv2, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as lag1, LAG(aggregate_tes [...]
+ "ProjectionExec: expr=[c9@0 as c9, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@4 as fv1, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as fv2, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as lag1, LAG(aggregate_tes [...]
" RepartitionExec: partitioning=RoundRobinBatch(2)",
" GlobalLimitExec: skip=0, fetch=5",
" WindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: \"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_id: 0, [...]
@@ -1909,11 +1909,11 @@ async fn test_window_agg_sort_non_reversed_plan() -> Result<()> {
// We cannot reverse each window function (ROW_NUMBER is not reversible)
let expected = {
vec![
- "ProjectionExec: expr=[c9@2 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as rn1, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as rn2]",
+ "ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as rn1, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as rn2]",
" RepartitionExec: partitioning=RoundRobinBatch(2)",
" GlobalLimitExec: skip=0, fetch=5",
" WindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
- " SortExec: [c9@1 ASC NULLS LAST]",
+ " SortExec: [c9@0 ASC NULLS LAST]",
" WindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
" SortExec: [c9@0 DESC]",
]
@@ -1964,11 +1964,11 @@ async fn test_window_agg_sort_multi_layer_non_reversed_plan() -> Result<()> {
// We cannot reverse each window function (ROW_NUMBER is not reversible)
let expected = {
vec![
- "ProjectionExec: expr=[c9@5 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum2, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWE [...]
+ "ProjectionExec: expr=[c9@2 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@5 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWE [...]
" RepartitionExec: partitioning=RoundRobinBatch(2)",
" GlobalLimitExec: skip=0, fetch=5",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
- " SortExec: [c9@4 ASC NULLS LAST,c1@2 ASC NULLS LAST,c2@3 ASC NULLS LAST]",
+ " SortExec: [c9@2 ASC NULLS LAST,c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
" WindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
" SortExec: [c9@2 DESC,c1@0 DESC]",
@@ -2053,14 +2053,14 @@ async fn test_window_agg_complex_plan() -> Result<()> {
// Unnecessary SortExecs are removed
let expected = {
vec![
- "ProjectionExec: expr=[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@0 as a, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@0 as b, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@15 as c, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@7 as d, SUM(null_ [...]
+ "ProjectionExec: expr=[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@19 as a, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@19 as b, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@4 as c, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@12 as d, SUM(nul [...]
" RepartitionExec: partitioning=RoundRobinBatch(2)",
" GlobalLimitExec: skip=0, fetch=5",
" WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Prec [...]
" WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]",
- " SortExec: [c3@17 ASC NULLS LAST,c2@16 ASC NULLS LAST]",
+ " SortExec: [c3@2 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
" WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]",
- " SortExec: [c3@16 ASC NULLS LAST,c1@14 ASC]",
+ " SortExec: [c3@2 ASC NULLS LAST,c1@0 ASC]",
" WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }]",
" WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(11)), end_bound: Following(Int64(10)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start [...]
" WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, sta [...]
@@ -2103,7 +2103,7 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_plan() -> Result<()>
// Only 1 SortExec was added
let expected = {
vec![
- "ProjectionExec: expr=[c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as sum2]",
+ "ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]",
" RepartitionExec: partitioning=RoundRobinBatch(2)",
" GlobalLimitExec: skip=0, fetch=5",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
@@ -2159,7 +2159,7 @@ async fn test_window_agg_sort_partitionby_reversed_plan() -> Result<()> {
// Only 1 SortExec was added
let expected = {
vec![
- "ProjectionExec: expr=[c9@3 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum2]",
+ "ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum2]",
" RepartitionExec: partitioning=RoundRobinBatch(2)",
" GlobalLimitExec: skip=0, fetch=5",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]",
@@ -2214,7 +2214,7 @@ async fn test_window_agg_sort_orderby_reversed_binary_expr() -> Result<()> {
// Only 1 SortExec was added
let expected = {
vec![
- "ProjectionExec: expr=[c3@3 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@0 as sum2]",
+ "ProjectionExec: expr=[c3@1 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as sum2]",
" RepartitionExec: partitioning=RoundRobinBatch(2)",
" GlobalLimitExec: skip=0, fetch=5",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: \"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_o [...]
@@ -2328,7 +2328,7 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Re
// Only 1 SortExec was added
let expected = {
vec![
- "ProjectionExec: expr=[c3@3 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@0 as sum2]",
+ "ProjectionExec: expr=[c3@1 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum2]",
" RepartitionExec: partitioning=RoundRobinBatch(2)",
" GlobalLimitExec: skip=0, fetch=5",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs
index 6bb3cf80e..50325cfde 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -681,8 +681,8 @@ impl LogicalPlanBuilder {
let window_expr = normalize_cols(window_expr, &self.plan)?;
let all_expr = window_expr.iter();
validate_unique_names("Windows", all_expr.clone())?;
- let mut window_fields: Vec<DFField> = exprlist_to_fields(all_expr, &self.plan)?;
- window_fields.extend_from_slice(self.plan.schema().fields());
+ let mut window_fields: Vec<DFField> = self.plan.schema().fields().clone();
+ window_fields.extend_from_slice(&exprlist_to_fields(all_expr, &self.plan)?);
let metadata = self.plan.schema().metadata().clone();
Ok(Self::from(LogicalPlan::Window(Window {