You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "walterddr (via GitHub)" <gi...@apache.org> on 2023/02/07 22:33:22 UTC

[GitHub] [pinot] walterddr commented on a diff in pull request #10228: [multistage] Initial (phase 1) query planner support for window functions

walterddr commented on code in PR #10228:
URL: https://github.com/apache/pinot/pull/10228#discussion_r1099333999


##########
pinot-query-planner/src/test/resources/queries/GroupBy.json:
##########
@@ -0,0 +1,78 @@
+{
+  "group_by_planning_tests": {
+    "queries": [
+      {
+        "description": "Group by with select and aggregate column",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) FROM a GROUP BY a.col1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
+          "\n  LogicalExchange(distribution=[hash[0]])",
+          "\n    LogicalAggregate(group=[{2}], EXPR$1=[$SUM0($1)])",
+          "\n      LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Group by with filter",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) FROM a WHERE a.col3 >= 0 AND a.col2 = 'a' GROUP BY a.col1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
+          "\n  LogicalExchange(distribution=[hash[0]])",
+          "\n    LogicalAggregate(group=[{2}], EXPR$1=[$SUM0($1)])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalFilter(condition=[AND(>=($1, 0), =($0, 'a'))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Group by count(*) with filter",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, COUNT(*) FROM a WHERE a.col3 >= 0 AND a.col2 = 'a' GROUP BY a.col1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
+          "\n  LogicalExchange(distribution=[hash[0]])",
+          "\n    LogicalAggregate(group=[{2}], EXPR$1=[COUNT()])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",

Review Comment:
   this one looks incorrect, project should only keep col1 b/c col2 and col3 already be pushed past into the filter . could you add a TODO comment in the JSON indicating that this needs follow up?



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/WindowNode.java:
##########
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.stage;
+
+import com.clearspring.analytics.util.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexWindowBound;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.planner.serde.ProtoProperties;
+
+
+public class WindowNode extends AbstractStageNode {
+  @ProtoProperties
+  public List<RexExpression> _groupSet;

Review Comment:
   could you add a test to `SerDeUtilsTest` to ensure WindowNode is Ser/De properly?



##########
pinot-query-planner/src/test/resources/queries/WindowFunction.json:
##########
@@ -0,0 +1,1181 @@
+{
+  "window_function_planning_tests": {
+    "queries": [
+      {
+        "description": "single empty OVER() only",
+        "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER() FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$1])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col3=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single empty OVER() and select col",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER() FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$1], $1=[$2])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single empty OVER() with default frame",
+        "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM a",
+        "notes": "Calcite validation fails if RANGE is used but later Calcite overrides ROWS with RANGE",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$1])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col3=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single empty OVER() and select col with global order by",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER() FROM a ORDER BY a.col2",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
+          "\n  LogicalSortExchange(distribution=[hash], collation=[[2]])",
+          "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
+          "\n      LogicalProject(col1=[$2], EXPR$1=[$3], col2=[$0])",
+          "\n        LogicalWindow(window#0=[window(aggs [SUM($1)])])",
+          "\n          LogicalExchange(distribution=[hash])",
+          "\n            LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single empty OVER() and transform col",
+        "sql": "EXPLAIN PLAN FOR SELECT CONCAT(a.col1, '-', a.col2), SUM(a.col3) OVER() FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$1], $1=[$2])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col3=[$1], $1=[CONCAT($2, '-', $0)])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single empty OVER() select col and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(a.col3) OVER() FROM a WHERE a.col3 > 10",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$1], EXPR$1=[CAST(/($2, $3)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($0), COUNT($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col3=[$1], col1=[$2])",
+          "\n        LogicalFilter(condition=[>($1, 10)])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single empty OVER() with select transform and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT CONCAT(a.col1, '-', a.col2), MIN(a.col3) OVER() FROM a where a.col1 IN ('foo', 'bar')",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$1], $1=[$2])",
+          "\n  LogicalWindow(window#0=[window(aggs [MIN($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col3=[$1], $1=[CONCAT($2, '-', $0)])",
+          "\n        LogicalFilter(condition=[OR(=($2, 'bar'), =($2, 'foo'))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple empty OVER()s only",
+        "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER(), COUNT(a.col2) OVER() FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2], $1=[$3])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($1), COUNT($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col2=[$0], col3=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple empty OVER()s and select col",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER(), COUNT(a.col2) OVER() FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$2], $1=[$3], $2=[$4])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($1), COUNT($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple empty OVER()s with default frame on one but not the other",
+        "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), MIN(a.col3) OVER() FROM a",
+        "notes": "Calcite validation fails if RANGE is used but later Calcite overrides ROWS with RANGE",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$1], $1=[$2])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($0), MIN($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col3=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple empty OVER()s and select col with global order by",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER(), COUNT(a.col2) OVER() FROM a ORDER BY a.col1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
+          "\n  LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
+          "\n      LogicalProject(col1=[$2], $1=[$3], $2=[$4])",
+          "\n        LogicalWindow(window#0=[window(aggs [SUM($1), COUNT($0)])])",
+          "\n          LogicalExchange(distribution=[hash])",
+          "\n            LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple empty OVER()s and transform col",
+        "sql": "EXPLAIN PLAN FOR SELECT CONCAT(a.col1, '-', a.col2), SUM(a.col3) OVER(), MAX(a.col3) OVER() FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$1], $1=[$2], $2=[$3])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($0), MAX($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col3=[$1], $1=[CONCAT($2, '-', $0)])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple empty OVER()s select col and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(a.col3) OVER(), COUNT(a.col1) OVER() FROM a WHERE a.col3 > 100",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$1], EXPR$1=[CAST(/($2, $3)):INTEGER NOT NULL], EXPR$2=[$4])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($0), COUNT($0), COUNT($1)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col3=[$1], col1=[$2])",
+          "\n        LogicalFilter(condition=[>($1, 100)])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple empty OVER()s with select transform and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT LENGTH(CONCAT(a.col1, ' ', a.col2)), MIN(a.col3) OVER(), MAX(a.col3) OVER() FROM a where a.col1 NOT IN ('foo', 'bar', 'baz')",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$1], $1=[$2], $2=[$3])",
+          "\n  LogicalWindow(window#0=[window(aggs [MIN($0), MAX($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col3=[$1], $1=[LENGTH(CONCAT($2, ' ', $0))])",
+          "\n        LogicalFilter(condition=[AND(<>($2, 'bar'), <>($2, 'baz'), <>($2, 'foo'))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY) only",
+        "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER(PARTITION BY a.col2) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2])",
+          "\n  LogicalWindow(window#0=[window(partition {0} aggs [SUM($1)])])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY) and select col",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(a.col3) OVER(PARTITION BY a.col2) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$2], EXPR$1=[CAST(/($3, $4)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(partition {0} aggs [SUM($1), COUNT($1)])])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY) with default frame",
+        "sql": "EXPLAIN PLAN FOR SELECT MAX(a.col3) OVER(PARTITION BY a.col1 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM a",
+        "notes": "Calcite validation fails if RANGE is used but later Calcite overrides ROWS with RANGE",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2])",
+          "\n  LogicalWindow(window#0=[window(partition {1} aggs [MAX($0)])])",
+          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n      LogicalProject(col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY) and select col with global order by",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col2, MIN(a.col3) OVER(PARTITION BY a.col1) FROM a ORDER BY a.col1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
+          "\n  LogicalSortExchange(distribution=[hash], collation=[[2]])",
+          "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
+          "\n      LogicalProject(col2=[$0], EXPR$1=[$3], col1=[$2])",
+          "\n        LogicalWindow(window#0=[window(partition {2} aggs [MIN($1)])])",
+          "\n          LogicalExchange(distribution=[hash[2]])",
+          "\n            LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY) and transform col",
+        "sql": "EXPLAIN PLAN FOR SELECT SUBSTR(a.col1, 0, 2), COUNT(a.col2) OVER(PARTITION BY a.col3) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2], $1=[$3])",
+          "\n  LogicalWindow(window#0=[window(partition {1} aggs [COUNT($0)])])",
+          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], $2=[SUBSTR($2, 0, 2)])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY) select col and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col2, AVG(a.col3) OVER(PARTITION BY a.col2) FROM a WHERE a.col3 > 10 AND a.col3 <= 500",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col2=[$0], EXPR$1=[CAST(/($2, $3)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(partition {0} aggs [SUM($1), COUNT($1)])])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1])",
+          "\n        LogicalFilter(condition=[AND(>($1, 10), <=($1, 500))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY) with select transform and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT CONCAT(a.col1, '-', a.col2), AVG(a.col3) OVER(PARTITION BY a.col2) FROM a where a.col1 NOT IN ('foo', 'bar') OR a.col3 >= 42",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(EXPR$0=[$2], EXPR$1=[CAST(/($3, $4)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(partition {0} aggs [SUM($1), COUNT($1)])])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], $2=[CONCAT($2, '-', $0)])",
+          "\n        LogicalFilter(condition=[OR(AND(<>($2, 'bar'), <>($2, 'foo')), >=($1, 42))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY) with transform on partition key",
+        "sql": "EXPLAIN PLAN FOR SELECT AVG(a.col3) OVER(PARTITION BY CONCAT(a.col1, '-', a.col2)) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(EXPR$0=[CAST(/($2, $3)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(partition {1} aggs [SUM($0), COUNT($0)])])",
+          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n      LogicalProject(col3=[$1], $1=[CONCAT($2, '-', $0)])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY)s on the same key only",
+        "sql": "EXPLAIN PLAN FOR SELECT MAX(a.col3) OVER(PARTITION BY a.col1), COUNT(a.col2) OVER(PARTITION BY a.col1) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$3], $1=[$4])",
+          "\n  LogicalWindow(window#0=[window(partition {2} aggs [MAX($1), COUNT($0)])])",
+          "\n    LogicalExchange(distribution=[hash[2]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY)s on the same key and select col",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(a.col3) OVER(PARTITION BY a.col1), MIN(a.col3) OVER(PARTITION BY a.col1) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$1], EXPR$1=[CAST(/($2, $3)):INTEGER NOT NULL], EXPR$2=[$4])",
+          "\n  LogicalWindow(window#0=[window(partition {1} aggs [SUM($0), COUNT($0), MIN($0)])])",
+          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n      LogicalProject(col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY)s on the same key with default frame for one and not the other",
+        "sql": "EXPLAIN PLAN FOR SELECT COUNT(a.col3) OVER(PARTITION BY a.col2 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), MIN(a.col3) OVER(PARTITION BY a.col2) FROM a",
+        "notes": "Calcite validation fails if RANGE is used but later Calcite overrides ROWS with RANGE",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2], $1=[$3])",
+          "\n  LogicalWindow(window#0=[window(partition {0} aggs [COUNT($1), MIN($1)])])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY)s on the same key and select col with global order by",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER(PARTITION BY a.col2, a.col1), AVG(a.col3) OVER(PARTITION BY a.col2, a.col1) FROM a ORDER BY a.col1",

Review Comment:
   add another with reversed order in `PARTITION BY`, they are essentially the same?
   ```
   SUM(a.col3) OVER(PARTITION BY a.col2, a.col1), AVG(a.col3) OVER(PARTITION BY a.col1, a.col2)
   ```



##########
pinot-query-planner/src/test/resources/queries/WindowFunction.json:
##########
@@ -0,0 +1,1181 @@
+{
+  "window_function_planning_tests": {
+    "queries": [
+      {
+        "description": "single empty OVER() only",
+        "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER() FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$1])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col3=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single empty OVER() and select col",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER() FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$1], $1=[$2])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single empty OVER() with default frame",
+        "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM a",
+        "notes": "Calcite validation fails if RANGE is used but later Calcite overrides ROWS with RANGE",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$1])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col3=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single empty OVER() and select col with global order by",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER() FROM a ORDER BY a.col2",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
+          "\n  LogicalSortExchange(distribution=[hash], collation=[[2]])",
+          "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
+          "\n      LogicalProject(col1=[$2], EXPR$1=[$3], col2=[$0])",
+          "\n        LogicalWindow(window#0=[window(aggs [SUM($1)])])",
+          "\n          LogicalExchange(distribution=[hash])",
+          "\n            LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single empty OVER() and transform col",
+        "sql": "EXPLAIN PLAN FOR SELECT CONCAT(a.col1, '-', a.col2), SUM(a.col3) OVER() FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$1], $1=[$2])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col3=[$1], $1=[CONCAT($2, '-', $0)])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single empty OVER() select col and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(a.col3) OVER() FROM a WHERE a.col3 > 10",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$1], EXPR$1=[CAST(/($2, $3)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($0), COUNT($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col3=[$1], col1=[$2])",
+          "\n        LogicalFilter(condition=[>($1, 10)])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single empty OVER() with select transform and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT CONCAT(a.col1, '-', a.col2), MIN(a.col3) OVER() FROM a where a.col1 IN ('foo', 'bar')",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$1], $1=[$2])",
+          "\n  LogicalWindow(window#0=[window(aggs [MIN($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col3=[$1], $1=[CONCAT($2, '-', $0)])",
+          "\n        LogicalFilter(condition=[OR(=($2, 'bar'), =($2, 'foo'))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple empty OVER()s only",
+        "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER(), COUNT(a.col2) OVER() FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2], $1=[$3])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($1), COUNT($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col2=[$0], col3=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple empty OVER()s and select col",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER(), COUNT(a.col2) OVER() FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$2], $1=[$3], $2=[$4])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($1), COUNT($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple empty OVER()s with default frame on one but not the other",
+        "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), MIN(a.col3) OVER() FROM a",
+        "notes": "Calcite validation fails if RANGE is used but later Calcite overrides ROWS with RANGE",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$1], $1=[$2])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($0), MIN($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col3=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple empty OVER()s and select col with global order by",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER(), COUNT(a.col2) OVER() FROM a ORDER BY a.col1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
+          "\n  LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
+          "\n      LogicalProject(col1=[$2], $1=[$3], $2=[$4])",
+          "\n        LogicalWindow(window#0=[window(aggs [SUM($1), COUNT($0)])])",
+          "\n          LogicalExchange(distribution=[hash])",
+          "\n            LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple empty OVER()s and transform col",
+        "sql": "EXPLAIN PLAN FOR SELECT CONCAT(a.col1, '-', a.col2), SUM(a.col3) OVER(), MAX(a.col3) OVER() FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$1], $1=[$2], $2=[$3])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($0), MAX($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col3=[$1], $1=[CONCAT($2, '-', $0)])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple empty OVER()s select col and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(a.col3) OVER(), COUNT(a.col1) OVER() FROM a WHERE a.col3 > 100",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$1], EXPR$1=[CAST(/($2, $3)):INTEGER NOT NULL], EXPR$2=[$4])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($0), COUNT($0), COUNT($1)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col3=[$1], col1=[$2])",
+          "\n        LogicalFilter(condition=[>($1, 100)])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple empty OVER()s with select transform and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT LENGTH(CONCAT(a.col1, ' ', a.col2)), MIN(a.col3) OVER(), MAX(a.col3) OVER() FROM a where a.col1 NOT IN ('foo', 'bar', 'baz')",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$1], $1=[$2], $2=[$3])",
+          "\n  LogicalWindow(window#0=[window(aggs [MIN($0), MAX($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col3=[$1], $1=[LENGTH(CONCAT($2, ' ', $0))])",
+          "\n        LogicalFilter(condition=[AND(<>($2, 'bar'), <>($2, 'baz'), <>($2, 'foo'))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY) only",
+        "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER(PARTITION BY a.col2) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2])",
+          "\n  LogicalWindow(window#0=[window(partition {0} aggs [SUM($1)])])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY) and select col",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(a.col3) OVER(PARTITION BY a.col2) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$2], EXPR$1=[CAST(/($3, $4)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(partition {0} aggs [SUM($1), COUNT($1)])])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY) with default frame",
+        "sql": "EXPLAIN PLAN FOR SELECT MAX(a.col3) OVER(PARTITION BY a.col1 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM a",
+        "notes": "Calcite validation fails if RANGE is used but later Calcite overrides ROWS with RANGE",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2])",
+          "\n  LogicalWindow(window#0=[window(partition {1} aggs [MAX($0)])])",
+          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n      LogicalProject(col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY) and select col with global order by",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col2, MIN(a.col3) OVER(PARTITION BY a.col1) FROM a ORDER BY a.col1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
+          "\n  LogicalSortExchange(distribution=[hash], collation=[[2]])",
+          "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
+          "\n      LogicalProject(col2=[$0], EXPR$1=[$3], col1=[$2])",
+          "\n        LogicalWindow(window#0=[window(partition {2} aggs [MIN($1)])])",
+          "\n          LogicalExchange(distribution=[hash[2]])",
+          "\n            LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY) and transform col",
+        "sql": "EXPLAIN PLAN FOR SELECT SUBSTR(a.col1, 0, 2), COUNT(a.col2) OVER(PARTITION BY a.col3) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2], $1=[$3])",
+          "\n  LogicalWindow(window#0=[window(partition {1} aggs [COUNT($0)])])",
+          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], $2=[SUBSTR($2, 0, 2)])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY) select col and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col2, AVG(a.col3) OVER(PARTITION BY a.col2) FROM a WHERE a.col3 > 10 AND a.col3 <= 500",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col2=[$0], EXPR$1=[CAST(/($2, $3)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(partition {0} aggs [SUM($1), COUNT($1)])])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1])",
+          "\n        LogicalFilter(condition=[AND(>($1, 10), <=($1, 500))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY) with select transform and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT CONCAT(a.col1, '-', a.col2), AVG(a.col3) OVER(PARTITION BY a.col2) FROM a where a.col1 NOT IN ('foo', 'bar') OR a.col3 >= 42",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(EXPR$0=[$2], EXPR$1=[CAST(/($3, $4)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(partition {0} aggs [SUM($1), COUNT($1)])])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], $2=[CONCAT($2, '-', $0)])",
+          "\n        LogicalFilter(condition=[OR(AND(<>($2, 'bar'), <>($2, 'foo')), >=($1, 42))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY) with transform on partition key",
+        "sql": "EXPLAIN PLAN FOR SELECT AVG(a.col3) OVER(PARTITION BY CONCAT(a.col1, '-', a.col2)) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(EXPR$0=[CAST(/($2, $3)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(partition {1} aggs [SUM($0), COUNT($0)])])",
+          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n      LogicalProject(col3=[$1], $1=[CONCAT($2, '-', $0)])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY)s on the same key only",
+        "sql": "EXPLAIN PLAN FOR SELECT MAX(a.col3) OVER(PARTITION BY a.col1), COUNT(a.col2) OVER(PARTITION BY a.col1) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$3], $1=[$4])",
+          "\n  LogicalWindow(window#0=[window(partition {2} aggs [MAX($1), COUNT($0)])])",
+          "\n    LogicalExchange(distribution=[hash[2]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY)s on the same key and select col",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(a.col3) OVER(PARTITION BY a.col1), MIN(a.col3) OVER(PARTITION BY a.col1) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$1], EXPR$1=[CAST(/($2, $3)):INTEGER NOT NULL], EXPR$2=[$4])",
+          "\n  LogicalWindow(window#0=[window(partition {1} aggs [SUM($0), COUNT($0), MIN($0)])])",
+          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n      LogicalProject(col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY)s on the same key with default frame for one and not the other",
+        "sql": "EXPLAIN PLAN FOR SELECT COUNT(a.col3) OVER(PARTITION BY a.col2 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), MIN(a.col3) OVER(PARTITION BY a.col2) FROM a",
+        "notes": "Calcite validation fails if RANGE is used but later Calcite overrides ROWS with RANGE",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2], $1=[$3])",
+          "\n  LogicalWindow(window#0=[window(partition {0} aggs [COUNT($1), MIN($1)])])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY)s on the same key and select col with global order by",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER(PARTITION BY a.col2, a.col1), AVG(a.col3) OVER(PARTITION BY a.col2, a.col1) FROM a ORDER BY a.col1",

Review Comment:
   add another with parition by key is used in ORDER BY but no tin SELECT, e.g. change `ORDER BY a.col1` to `ORDER BY a.col2` 
   --> the plan should be selecting out both col1 and col2 and 
   --> only at the final RelRoot (which encapsults the RelNode) will trim the col2 out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org