You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/05/24 14:45:02 UTC
[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6419: Named window support
alamb commented on code in PR #6419:
URL: https://github.com/apache/arrow-datafusion/pull/6419#discussion_r1204127393
##########
datafusion/core/tests/sqllogictests/test_files/window.slt:
##########
@@ -3023,3 +3023,64 @@ drop table annotated_data_finite2
statement ok
drop table annotated_data_infinite2
+
+# window3 spec is not used in window functions.
+# The query should still work.
+query RR
+SELECT
+ MAX(c12) OVER window1 as min1,
+ MIN(c12) OVER window2 as max1
+ FROM aggregate_test_100
+ WINDOW window1 AS (ORDER BY C12),
+ window2 AS (PARTITION BY C11),
+ window3 AS (ORDER BY C1)
+ ORDER BY C3
+ LIMIT 5
+----
+0.970671228336 0.970671228336
+0.850672105305 0.850672105305
+0.152498292972 0.152498292972
+0.369363046006 0.369363046006
+0.56535284223 0.56535284223
+
+query TT
+EXPLAIN SELECT
+ MAX(c12) OVER window1 as min1,
+ MIN(c12) OVER window2 as max1
+ FROM aggregate_test_100
+ WINDOW window1 AS (ORDER BY C12),
+ window2 AS (PARTITION BY C11),
+ window3 AS (ORDER BY C1)
+ ORDER BY C3
+ LIMIT 5
+----
+logical_plan
+Projection: min1, max1
+--Limit: skip=0, fetch=5
+----Sort: aggregate_test_100.c3 ASC NULLS LAST, fetch=5
+------Projection: MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS min1, MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS max1, aggregate_test_100.c3
+--------WindowAggr: windowExpr=[[MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+----------Projection: aggregate_test_100.c3, aggregate_test_100.c12, MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
+------------WindowAggr: windowExpr=[[MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
+--------------TableScan: aggregate_test_100 projection=[c3, c11, c12]
+physical_plan
+ProjectionExec: expr=[min1@0 as min1, max1@1 as max1]
+--GlobalLimitExec: skip=0, fetch=5
+----SortExec: fetch=5, expr=[c3@2 ASC NULLS LAST]
+------ProjectionExec: expr=[MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as min1, MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@2 as max1, c3@0 as c3]
+--------BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c12): Ok(Field { name: "MAX(aggregate_test_100.c12)", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Float64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+----------SortExec: expr=[c12@1 ASC NULLS LAST]
+------------ProjectionExec: expr=[c3@0 as c3, c12@2 as c12, MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@3 as MIN(aggregate_test_100.c12)]
+--------------WindowAggExec: wdw=[MIN(aggregate_test_100.c12): Ok(Field { name: "MIN(aggregate_test_100.c12)", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]
+----------------SortExec: expr=[c11@1 ASC NULLS LAST]
+------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3, c11, c12], has_header=true
+
+# window2 spec is not defined
Review Comment:
👍
##########
datafusion/sql/src/select.rs:
##########
@@ -70,10 +71,37 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// process `where` clause
let plan = self.plan_selection(select.selection, plan, planner_context)?;
+ // handle named windows before processing the projection expression
Review Comment:
I think the code would be nicer if this code to expand out named window definitions was refactored into a sub function -- something like `fn expand_window_definitions()` or something, perhaps
##########
datafusion/sql/src/select.rs:
##########
@@ -70,10 +71,37 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// process `where` clause
let plan = self.plan_selection(select.selection, plan, planner_context)?;
+ // handle named windows before processing the projection expression
+ let mut modified_projection = select.projection.clone();
+ // If the projection is done over a named window, that window
+ // name must be defined. Otherwise, it gives an error.
+ for proj in modified_projection.iter_mut() {
+ if let SelectItem::ExprWithAlias {
+ expr: SQLExpr::Function(f),
+ alias: _,
+ } = proj
+ {
+ for NamedWindowDefinition(window_ident, window_spec) in
+ select.named_window.iter()
+ {
+ if let Some(WindowType::NamedWindow(ident)) = &f.over {
+ if ident.eq(window_ident) {
+ f.over = Some(WindowType::WindowSpec(window_spec.clone()))
+ }
+ }
+ }
+ // All named windows must be defined with a WindowSpec.
Review Comment:
Is it worth checking here for duplicated window names (aka two definitions of `window1`?)
##########
datafusion/core/tests/sqllogictests/test_files/window.slt:
##########
@@ -3023,3 +3023,64 @@ drop table annotated_data_finite2
statement ok
drop table annotated_data_infinite2
+
+# window3 spec is not used in window functions.
Review Comment:
Another test that might be valuable is to to use the same named window more than once as I think this is one reason named windows are often used.
Something like this perhaps, which uses `window1` more than once:
```sql
SELECT
MAX(c12) OVER window1 as min1,
MIN(c12) OVER window1 as max1
FROM aggregate_test_100
WINDOW window1 AS (ORDER BY C12),
ORDER BY C3
LIMIT 5
```
##########
datafusion/sql/src/select.rs:
##########
@@ -70,10 +71,37 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// process `where` clause
let plan = self.plan_selection(select.selection, plan, planner_context)?;
+ // handle named windows before processing the projection expression
+ let mut modified_projection = select.projection.clone();
Review Comment:
You may be able to avoid a clone here by modifing `select.projection` in place
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org