You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/05/24 14:45:02 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6419: Named window support

alamb commented on code in PR #6419:
URL: https://github.com/apache/arrow-datafusion/pull/6419#discussion_r1204127393


##########
datafusion/core/tests/sqllogictests/test_files/window.slt:
##########
@@ -3023,3 +3023,64 @@ drop table annotated_data_finite2
 
 statement ok
 drop table annotated_data_infinite2
+
+# window3 spec is not used in window functions.
+# The query should still work.
+query RR
+SELECT
+  MAX(c12) OVER window1 as min1,
+  MIN(c12) OVER window2 as max1
+  FROM aggregate_test_100
+  WINDOW window1 AS (ORDER BY C12),
+  window2 AS (PARTITION BY C11),
+  window3 AS (ORDER BY C1)
+  ORDER BY C3
+  LIMIT 5
+----
+0.970671228336 0.970671228336
+0.850672105305 0.850672105305
+0.152498292972 0.152498292972
+0.369363046006 0.369363046006
+0.56535284223 0.56535284223
+
+query TT
+EXPLAIN SELECT
+  MAX(c12) OVER window1 as min1,
+  MIN(c12) OVER window2 as max1
+  FROM aggregate_test_100
+  WINDOW window1 AS (ORDER BY C12),
+  window2 AS (PARTITION BY C11),
+  window3 AS (ORDER BY C1)
+  ORDER BY C3
+  LIMIT 5
+----
+logical_plan
+Projection: min1, max1
+--Limit: skip=0, fetch=5
+----Sort: aggregate_test_100.c3 ASC NULLS LAST, fetch=5
+------Projection: MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS min1, MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS max1, aggregate_test_100.c3
+--------WindowAggr: windowExpr=[[MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+----------Projection: aggregate_test_100.c3, aggregate_test_100.c12, MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
+------------WindowAggr: windowExpr=[[MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
+--------------TableScan: aggregate_test_100 projection=[c3, c11, c12]
+physical_plan
+ProjectionExec: expr=[min1@0 as min1, max1@1 as max1]
+--GlobalLimitExec: skip=0, fetch=5
+----SortExec: fetch=5, expr=[c3@2 ASC NULLS LAST]
+------ProjectionExec: expr=[MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as min1, MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@2 as max1, c3@0 as c3]
+--------BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c12): Ok(Field { name: "MAX(aggregate_test_100.c12)", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Float64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+----------SortExec: expr=[c12@1 ASC NULLS LAST]
+------------ProjectionExec: expr=[c3@0 as c3, c12@2 as c12, MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@3 as MIN(aggregate_test_100.c12)]
+--------------WindowAggExec: wdw=[MIN(aggregate_test_100.c12): Ok(Field { name: "MIN(aggregate_test_100.c12)", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]
+----------------SortExec: expr=[c11@1 ASC NULLS LAST]
+------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3, c11, c12], has_header=true
+
+# window2 spec is not defined

Review Comment:
   👍 



##########
datafusion/sql/src/select.rs:
##########
@@ -70,10 +71,37 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         // process `where` clause
         let plan = self.plan_selection(select.selection, plan, planner_context)?;
 
+        // handle named windows before processing the projection expression

Review Comment:
   I think the code would be nicer if this code to expand out named window definitions was refactored into a sub function -- something like `fn expand_window_definitions()` or something, perhaps



##########
datafusion/sql/src/select.rs:
##########
@@ -70,10 +71,37 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         // process `where` clause
         let plan = self.plan_selection(select.selection, plan, planner_context)?;
 
+        // handle named windows before processing the projection expression
+        let mut modified_projection = select.projection.clone();
+        // If the projection is done over a named window, that window
+        // name must be defined. Otherwise, it gives an error.
+        for proj in modified_projection.iter_mut() {
+            if let SelectItem::ExprWithAlias {
+                expr: SQLExpr::Function(f),
+                alias: _,
+            } = proj
+            {
+                for NamedWindowDefinition(window_ident, window_spec) in
+                    select.named_window.iter()
+                {
+                    if let Some(WindowType::NamedWindow(ident)) = &f.over {
+                        if ident.eq(window_ident) {
+                            f.over = Some(WindowType::WindowSpec(window_spec.clone()))
+                        }
+                    }
+                }
+                // All named windows must be defined with a WindowSpec.

Review Comment:
   Is it worth checking here for duplicated window names (aka two definitions of `window1`?)



##########
datafusion/core/tests/sqllogictests/test_files/window.slt:
##########
@@ -3023,3 +3023,64 @@ drop table annotated_data_finite2
 
 statement ok
 drop table annotated_data_infinite2
+
+# window3 spec is not used in window functions.

Review Comment:
   Another test that might be valuable is to to use the same named window more than once as I think this is one reason named windows are often used.
   
   Something like this perhaps, which uses `window1` more than once:
   
   ```sql
   SELECT
     MAX(c12) OVER window1 as min1,
     MIN(c12) OVER window1 as max1
     FROM aggregate_test_100
     WINDOW window1 AS (ORDER BY C12),
     ORDER BY C3
     LIMIT 5
   
   ```



##########
datafusion/sql/src/select.rs:
##########
@@ -70,10 +71,37 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         // process `where` clause
         let plan = self.plan_selection(select.selection, plan, planner_context)?;
 
+        // handle named windows before processing the projection expression
+        let mut modified_projection = select.projection.clone();

Review Comment:
   You may be able to avoid a clone here by modifing `select.projection` in place



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org