You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2023/02/08 18:28:05 UTC

[pinot] branch master updated: [multistage] Initial (phase 1) query planner support for window functions (#10228)

This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c9a9df8ca [multistage] Initial (phase 1) query planner support for window functions (#10228)
3c9a9df8ca is described below

commit 3c9a9df8ca33c64be026f4b4d37021090c750bce
Author: Sonam Mandal <so...@linkedin.com>
AuthorDate: Wed Feb 8 10:27:49 2023 -0800

    [multistage] Initial (phase 1) query planner support for window functions (#10228)
    
    * Initial (phase 1) planner changes for window functions in multi-stage engine
    
    * Add some group by tests with aggregation used within window order by clause for window functions
    
    * Address review comments
    
    * Address review comments
---
 .../calcite/rel/rules/PinotQueryRuleSets.java      |   12 +-
 .../rules/PinotWindowExchangeNodeInsertRule.java   |  166 +++
 .../query/planner/ExplainPlanStageVisitor.java     |    6 +
 .../query/planner/logical/RelToStageConverter.java |   10 +-
 .../pinot/query/planner/logical/RexExpression.java |    9 +
 .../planner/logical/ShuffleRewriteVisitor.java     |    6 +
 .../planner/logical/StageMetadataVisitor.java      |    8 +
 .../colocated/GreedyShuffleRewriteVisitor.java     |    6 +
 .../pinot/query/planner/stage/AggregateNode.java   |    9 +-
 .../stage/DefaultPostOrderTraversalVisitor.java    |    6 +
 .../query/planner/stage/StageNodeSerDeUtils.java   |    2 +
 .../query/planner/stage/StageNodeVisitor.java      |    2 +
 .../pinot/query/planner/stage/WindowNode.java      |  157 +++
 .../pinot/query/QueryEnvironmentTestBase.java      |   52 +
 .../query/queries/ResourceBasedQueryPlansTest.java |  177 +++
 .../src/test/resources/queries/AggregatePlans.json |   49 +
 .../test/resources/queries/BasicQueryPlans.json    |   63 +
 .../src/test/resources/queries/GroupByPlans.json   |   97 ++
 .../src/test/resources/queries/JoinPlans.json      |  236 ++++
 .../src/test/resources/queries/OrderByPlans.json   |   86 ++
 .../resources/queries/WindowFunctionPlans.json     | 1488 ++++++++++++++++++++
 .../query/runtime/plan/PhysicalPlanVisitor.java    |    7 +
 .../runtime/plan/ServerRequestPlanVisitor.java     |    6 +
 23 files changed, 2649 insertions(+), 11 deletions(-)

diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
index 1a3589e1c9..f93e16b637 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
@@ -34,8 +34,8 @@ public class PinotQueryRuleSets {
 
   public static final Collection<RelOptRule> LOGICAL_OPT_RULES =
       Arrays.asList(EnumerableRules.ENUMERABLE_FILTER_RULE, EnumerableRules.ENUMERABLE_JOIN_RULE,
-          EnumerableRules.ENUMERABLE_PROJECT_RULE, EnumerableRules.ENUMERABLE_SORT_RULE,
-          EnumerableRules.ENUMERABLE_TABLE_SCAN_RULE,
+          EnumerableRules.ENUMERABLE_PROJECT_RULE, EnumerableRules.ENUMERABLE_WINDOW_RULE,
+          EnumerableRules.ENUMERABLE_SORT_RULE, EnumerableRules.ENUMERABLE_TABLE_SCAN_RULE,
 
           // push a filter into a join
           CoreRules.FILTER_INTO_JOIN,
@@ -62,6 +62,11 @@ public class PinotQueryRuleSets {
           // reorder sort and projection
           CoreRules.SORT_PROJECT_TRANSPOSE,
 
+          // convert OVER aggregate to logical WINDOW
+          CoreRules.PROJECT_TO_LOGICAL_PROJECT_AND_WINDOW,
+          // push project through WINDOW
+          CoreRules.PROJECT_WINDOW_TRANSPOSE,
+
           // TODO: evaluate the SORT_JOIN_TRANSPOSE and SORT_JOIN_COPY rules
 
           // join rules
@@ -99,6 +104,7 @@ public class PinotQueryRuleSets {
           PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY,
 
           PinotJoinExchangeNodeInsertRule.INSTANCE,
-          PinotAggregateExchangeNodeInsertRule.INSTANCE
+          PinotAggregateExchangeNodeInsertRule.INSTANCE,
+          PinotWindowExchangeNodeInsertRule.INSTANCE
       );
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java
new file mode 100644
index 0000000000..e51c5c373a
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelDistributions;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.logical.LogicalExchange;
+import org.apache.calcite.rel.logical.LogicalSortExchange;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.tools.RelBuilderFactory;
+
+
+/**
+ * Special rule for Pinot, this rule is fixed to always insert an exchange or sort exchange below the WINDOW node.
+ * TODO:
+ *     1. Add support for more than one window group
+ *     2. Add support for functions other than aggregation functions (AVG, COUNT, MAX, MIN, SUM)
+ *     3. Add support for custom frames
+ */
+public class PinotWindowExchangeNodeInsertRule extends RelOptRule {
+  public static final PinotWindowExchangeNodeInsertRule INSTANCE =
+      new PinotWindowExchangeNodeInsertRule(PinotRuleUtils.PINOT_REL_FACTORY);
+
+  // Supported window functions
+  private static final Set<SqlKind> SUPPORTED_WINDOW_FUNCTION_KIND = ImmutableSet.of(SqlKind.SUM, SqlKind.SUM0,
+      SqlKind.MIN, SqlKind.MAX, SqlKind.COUNT);
+
+  public PinotWindowExchangeNodeInsertRule(RelBuilderFactory factory) {
+    super(operand(LogicalWindow.class, any()), factory, null);
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    if (call.rels.length < 1) {
+      return false;
+    }
+    if (call.rel(0) instanceof Window) {
+      Window window = call.rel(0);
+      // Only run the rule if the input isn't already an exchange node
+      return !PinotRuleUtils.isExchange(window.getInput());
+    }
+    return false;
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    Window window = call.rel(0);
+    RelNode windowInput = window.getInput();
+
+    // Perform all validations
+    validateWindows(window);
+
+    Window.Group windowGroup = window.groups.get(0);
+    if (windowGroup.keys.isEmpty() && windowGroup.orderKeys.getKeys().isEmpty()) {
+      // Empty OVER()
+      // Add a single LogicalExchange for empty OVER() since no sort is required
+      LogicalExchange exchange = LogicalExchange.create(windowInput, RelDistributions.hash(Collections.emptyList()));
+      call.transformTo(
+          LogicalWindow.create(window.getTraitSet(), exchange, window.constants, window.getRowType(), window.groups));
+    } else if (windowGroup.keys.isEmpty() && !windowGroup.orderKeys.getKeys().isEmpty()) {
+      // Only ORDER BY
+      // Add a LogicalSortExchange with collation on the order by key(s) and an empty hash partition key
+      LogicalSortExchange sortExchange = LogicalSortExchange.create(windowInput,
+          RelDistributions.hash(Collections.emptyList()), windowGroup.orderKeys);
+      call.transformTo(LogicalWindow.create(window.getTraitSet(), sortExchange, window.constants, window.getRowType(),
+          window.groups));
+    } else {
+      // All other variants
+      // Assess whether this is a PARTITION BY only query or not (includes queries of the type where PARTITION BY and
+      // ORDER BY key(s) are the same)
+      boolean isPartitionByOnly = isPartitionByOnlyQuery(windowGroup);
+
+      if (isPartitionByOnly) {
+        // Only PARTITION BY or PARTITION BY and ORDER BY on the same key(s)
+        // Add a LogicalExchange hashed on the partition by keys
+        LogicalExchange exchange = LogicalExchange.create(windowInput,
+            RelDistributions.hash(windowGroup.keys.toList()));
+        call.transformTo(LogicalWindow.create(window.getTraitSet(), exchange, window.constants, window.getRowType(),
+            window.groups));
+      } else {
+        // PARTITION BY and ORDER BY on different key(s)
+        // Add a LogicalSortExchange hashed on the partition by keys and collation based on order by keys
+        LogicalSortExchange sortExchange = LogicalSortExchange.create(windowInput,
+            RelDistributions.hash(windowGroup.keys.toList()), windowGroup.orderKeys);
+        call.transformTo(LogicalWindow.create(window.getTraitSet(), sortExchange, window.constants, window.getRowType(),
+            window.groups));
+      }
+    }
+  }
+
+  private void validateWindows(Window window) {
+    int numGroups = window.groups.size();
+    // For Phase 1 we only handle single window groups
+    Preconditions.checkState(numGroups <= 1,
+        String.format("Currently only 1 window group is supported, query has %d groups", numGroups));
+
+    // Validate that only supported window aggregation functions are present
+    Window.Group windowGroup = window.groups.get(0);
+    validateWindowAggCallsSupported(windowGroup);
+
+    // Validate the frame
+    validateWindowFrames(windowGroup);
+  }
+
+  private void validateWindowAggCallsSupported(Window.Group windowGroup) {
+    for (int i = 0; i < windowGroup.aggCalls.size(); i++) {
+      Window.RexWinAggCall aggCall = windowGroup.aggCalls.get(i);
+      SqlKind aggKind = aggCall.getKind();
+      Preconditions.checkState(SUPPORTED_WINDOW_FUNCTION_KIND.contains(aggKind),
+          String.format("Unsupported Window function kind: %s. Only aggregation functions are supported!", aggKind));
+    }
+  }
+
+  private void validateWindowFrames(Window.Group windowGroup) {
+    // For Phase 1 only the default frame is supported
+    Preconditions.checkState(!windowGroup.isRows, "Default frame must be of type RANGE and not ROWS");
+    Preconditions.checkState(windowGroup.lowerBound.isPreceding() && windowGroup.lowerBound.isUnbounded(),
+        String.format("Lower bound must be UNBOUNDED PRECEDING but it is: %s", windowGroup.lowerBound));
+    if (windowGroup.orderKeys.getKeys().isEmpty()) {
+      Preconditions.checkState(windowGroup.upperBound.isFollowing() && windowGroup.upperBound.isUnbounded(),
+          String.format("Upper bound must be UNBOUNDED PRECEDING but it is: %s", windowGroup.upperBound));
+    } else {
+      Preconditions.checkState(windowGroup.upperBound.isCurrentRow(),
+          String.format("Upper bound must be CURRENT ROW but it is: %s", windowGroup.upperBound));
+    }
+  }
+
+  private boolean isPartitionByOnlyQuery(Window.Group windowGroup) {
+    boolean isPartitionByOnly = false;
+    if (windowGroup.orderKeys.getKeys().isEmpty()) {
+      return true;
+    }
+
+    if (windowGroup.orderKeys.getKeys().size() == windowGroup.keys.asList().size()) {
+      Set<Integer> partitionByKeyList = new HashSet<>(windowGroup.keys.toList());
+      Set<Integer> orderByKeyList = new HashSet<>(windowGroup.orderKeys.getKeys());
+      isPartitionByOnly = partitionByKeyList.equals(orderByKeyList);
+    }
+    return isPartitionByOnly;
+  }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java
index ccc71858cd..1b4b46795d 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java
@@ -34,6 +34,7 @@ import org.apache.pinot.query.planner.stage.StageNode;
 import org.apache.pinot.query.planner.stage.StageNodeVisitor;
 import org.apache.pinot.query.planner.stage.TableScanNode;
 import org.apache.pinot.query.planner.stage.ValueNode;
+import org.apache.pinot.query.planner.stage.WindowNode;
 import org.apache.pinot.query.routing.VirtualServer;
 
 
@@ -112,6 +113,11 @@ public class ExplainPlanStageVisitor implements StageNodeVisitor<StringBuilder,
     return visitSimpleNode(node, context);
   }
 
+  @Override
+  public StringBuilder visitWindow(WindowNode node, Context context) {
+    return visitSimpleNode(node, context);
+  }
+
   @Override
   public StringBuilder visitFilter(FilterNode node, Context context) {
     return visitSimpleNode(node, context);
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
index cede6a38e3..eb24da7e8e 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
@@ -30,6 +30,7 @@ import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.logical.LogicalSort;
 import org.apache.calcite.rel.logical.LogicalTableScan;
 import org.apache.calcite.rel.logical.LogicalValues;
+import org.apache.calcite.rel.logical.LogicalWindow;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rel.type.RelRecordType;
@@ -44,6 +45,7 @@ import org.apache.pinot.query.planner.stage.SortNode;
 import org.apache.pinot.query.planner.stage.StageNode;
 import org.apache.pinot.query.planner.stage.TableScanNode;
 import org.apache.pinot.query.planner.stage.ValueNode;
+import org.apache.pinot.query.planner.stage.WindowNode;
 import org.apache.pinot.spi.data.FieldSpec;
 
 
@@ -79,6 +81,8 @@ public final class RelToStageConverter {
       return convertLogicalSort((LogicalSort) node, currentStageId);
     } else if (node instanceof LogicalValues) {
       return convertLogicalValues((LogicalValues) node, currentStageId);
+    } else if (node instanceof LogicalWindow) {
+      return convertLogicalWindow((LogicalWindow) node, currentStageId);
     } else {
       throw new UnsupportedOperationException("Unsupported logical plan node: " + node);
     }
@@ -88,6 +92,10 @@ public final class RelToStageConverter {
     return new ValueNode(currentStageId, toDataSchema(node.getRowType()), node.tuples);
   }
 
+  private static StageNode convertLogicalWindow(LogicalWindow node, int currentStageId) {
+    return new WindowNode(currentStageId, node.groups, node.constants, toDataSchema(node.getRowType()));
+  }
+
   private static StageNode convertLogicalSort(LogicalSort node, int currentStageId) {
     int fetch = RexExpressionUtils.getValueAsInt(node.fetch);
     int offset = RexExpressionUtils.getValueAsInt(node.offset);
@@ -97,7 +105,7 @@ public final class RelToStageConverter {
 
   private static StageNode convertLogicalAggregate(LogicalAggregate node, int currentStageId) {
     return new AggregateNode(currentStageId, toDataSchema(node.getRowType()), node.getAggCallList(),
-        node.getGroupSet(), node.getHints());
+        RexExpression.toRexInputRefs(node.getGroupSet()), node.getHints());
   }
 
   private static StageNode convertLogicalProject(LogicalProject node, int currentStageId) {
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
index 7ef42ff7c9..abe1738fc6 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.query.planner.logical;
 
 import java.math.BigDecimal;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.calcite.rel.core.AggregateCall;
@@ -99,6 +100,14 @@ public interface RexExpression {
     }
   }
 
+  static List<RexExpression> toRexInputRefs(Iterable<Integer> bitset) {
+    List<RexExpression> rexInputRefList = new ArrayList<>();
+    for (int index : bitset) {
+      rexInputRefList.add(new RexExpression.InputRef(index));
+    }
+    return rexInputRefList;
+  }
+
   class InputRef implements RexExpression {
     @ProtoProperties
     private SqlKind _sqlKind;
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java
index b881982fe8..12fa9d6cac 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java
@@ -37,6 +37,7 @@ import org.apache.pinot.query.planner.stage.StageNode;
 import org.apache.pinot.query.planner.stage.StageNodeVisitor;
 import org.apache.pinot.query.planner.stage.TableScanNode;
 import org.apache.pinot.query.planner.stage.ValueNode;
+import org.apache.pinot.query.planner.stage.WindowNode;
 
 
 /**
@@ -73,6 +74,11 @@ public class ShuffleRewriteVisitor implements StageNodeVisitor<Set<Integer>, Voi
     return deriveNewPartitionKeysFromRexExpressions(groupSet, oldPartitionKeys);
   }
 
+  @Override
+  public Set<Integer> visitWindow(WindowNode node, Void context) {
+    throw new UnsupportedOperationException("Window not yet supported!");
+  }
+
   @Override
   public Set<Integer> visitFilter(FilterNode node, Void context) {
     // filters don't change partition keys
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StageMetadataVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StageMetadataVisitor.java
index cf76134c8a..a79b2159c6 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StageMetadataVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StageMetadataVisitor.java
@@ -34,6 +34,7 @@ import org.apache.pinot.query.planner.stage.StageNode;
 import org.apache.pinot.query.planner.stage.StageNodeVisitor;
 import org.apache.pinot.query.planner.stage.TableScanNode;
 import org.apache.pinot.query.planner.stage.ValueNode;
+import org.apache.pinot.query.planner.stage.WindowNode;
 
 
 /**
@@ -68,6 +69,13 @@ public class StageMetadataVisitor implements StageNodeVisitor<Void, QueryPlan> {
     return null;
   }
 
+  @Override
+  public Void visitWindow(WindowNode node, QueryPlan context) {
+    node.getInputs().get(0).visit(this, context);
+    visit(node, context);
+    return null;
+  }
+
   @Override
   public Void visitFilter(FilterNode node, QueryPlan context) {
     node.getInputs().get(0).visit(this, context);
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
index b37e250f14..2ba7dedbbf 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
@@ -44,6 +44,7 @@ import org.apache.pinot.query.planner.stage.StageNode;
 import org.apache.pinot.query.planner.stage.StageNodeVisitor;
 import org.apache.pinot.query.planner.stage.TableScanNode;
 import org.apache.pinot.query.planner.stage.ValueNode;
+import org.apache.pinot.query.planner.stage.WindowNode;
 import org.apache.pinot.query.routing.VirtualServer;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.IndexingConfig;
@@ -245,6 +246,11 @@ public class GreedyShuffleRewriteVisitor
     return node.getInputs().get(0).visit(this, context);
   }
 
+  @Override
+  public Set<ColocationKey> visitWindow(WindowNode node, GreedyShuffleRewriteContext context) {
+    return node.getInputs().get(0).visit(this, context);
+  }
+
   @Override
   public Set<ColocationKey> visitTableScan(TableScanNode node, GreedyShuffleRewriteContext context) {
     TableConfig tableConfig =
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java
index 251b7986d3..d2b2742b6e 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java
@@ -18,12 +18,10 @@
  */
 package org.apache.pinot.query.planner.stage;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.hint.RelHint;
-import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.serde.ProtoProperties;
@@ -41,14 +39,11 @@ public class AggregateNode extends AbstractStageNode {
     super(stageId);
   }
 
-  public AggregateNode(int stageId, DataSchema dataSchema, List<AggregateCall> aggCalls, ImmutableBitSet groupSet,
+  public AggregateNode(int stageId, DataSchema dataSchema, List<AggregateCall> aggCalls, List<RexExpression> groupSet,
       List<RelHint> relHints) {
     super(stageId, dataSchema);
     _aggCalls = aggCalls.stream().map(RexExpression::toRexExpression).collect(Collectors.toList());
-    _groupSet = new ArrayList<>(groupSet.cardinality());
-    for (Integer integer : groupSet) {
-      _groupSet.add(new RexExpression.InputRef(integer));
-    }
+    _groupSet = groupSet;
     _relHints = relHints;
   }
 
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/DefaultPostOrderTraversalVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/DefaultPostOrderTraversalVisitor.java
index 982a3a6db5..d41b0fe0bc 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/DefaultPostOrderTraversalVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/DefaultPostOrderTraversalVisitor.java
@@ -78,4 +78,10 @@ public abstract class DefaultPostOrderTraversalVisitor<T, C> implements StageNod
   public T visitValue(ValueNode node, C context) {
     return process(node, context);
   }
+
+  @Override
+  public T visitWindow(WindowNode node, C context) {
+    node.getInputs().get(0).visit(this, context);
+    return process(node, context);
+  }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
index 69de8bcba6..709c085170 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
@@ -83,6 +83,8 @@ public final class StageNodeSerDeUtils {
         return new MailboxReceiveNode(stageId);
       case "ValueNode":
         return new ValueNode(stageId);
+      case "WindowNode":
+        return new WindowNode(stageId);
       default:
         throw new IllegalArgumentException("Unknown node name: " + nodeName);
     }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeVisitor.java
index 614dbb877a..593cd336fc 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeVisitor.java
@@ -54,4 +54,6 @@ public interface StageNodeVisitor<T, C> {
   T visitTableScan(TableScanNode node, C context);
 
   T visitValue(ValueNode node, C context);
+
+  T visitWindow(WindowNode node, C context);
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/WindowNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/WindowNode.java
new file mode 100644
index 0000000000..1256987e7f
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/WindowNode.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.stage;
+
+import com.clearspring.analytics.util.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexWindowBound;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.planner.serde.ProtoProperties;
+
+
+public class WindowNode extends AbstractStageNode {
+  @ProtoProperties
+  public List<RexExpression> _groupSet;
+  @ProtoProperties
+  public List<RexExpression> _orderSet;
+  @ProtoProperties
+  public List<RelFieldCollation.Direction> _orderSetDirection;
+  @ProtoProperties
+  public List<RelFieldCollation.NullDirection> _orderSetNullDirection;
+  @ProtoProperties
+  public List<RexExpression> _aggCalls;
+  @ProtoProperties
+  public int _lowerBound;
+  @ProtoProperties
+  public int _upperBound;
+  @ProtoProperties
+  public boolean _isRows;
+  @ProtoProperties
+  private List<RexExpression> _constants;
+
+  public WindowNode(int stageId) {
+    super(stageId);
+  }
+
+  public WindowNode(int stageId, List<Window.Group> windowGroups, List<RexLiteral> constants, DataSchema dataSchema) {
+    super(stageId, dataSchema);
+    // Only a single Window Group should exist per WindowNode.
+    Preconditions.checkState(windowGroups.size() == 1,
+        String.format("Only a single window group is allowed! Number of window groups: %d", windowGroups.size()));
+    Window.Group windowGroup = windowGroups.get(0);
+
+    _groupSet = windowGroup.keys == null ? new ArrayList<>() : RexExpression.toRexInputRefs(windowGroup.keys);
+    List<RelFieldCollation> relFieldCollations = windowGroup.orderKeys == null ? new ArrayList<>()
+        : windowGroup.orderKeys.getFieldCollations();
+    _orderSet = new ArrayList<>(relFieldCollations.size());
+    _orderSetDirection = new ArrayList<>(relFieldCollations.size());
+    _orderSetNullDirection = new ArrayList<>(relFieldCollations.size());
+    for (RelFieldCollation relFieldCollation : relFieldCollations) {
+      _orderSet.add(new RexExpression.InputRef(relFieldCollation.getFieldIndex()));
+      _orderSetDirection.add(relFieldCollation.direction);
+      _orderSetNullDirection.add(relFieldCollation.nullDirection);
+    }
+    _aggCalls = windowGroup.aggCalls.stream().map(RexExpression::toRexExpression).collect(Collectors.toList());
+
+    // TODO: For now only the default frame is supported. Add support for custom frames including rows support.
+    //       Frame literals come in the constants from the LogicalWindow and the bound.getOffset() stores the
+    //       InputRef to the constants array offset by the input array length. These need to be extracted here and
+    //       set to the bounds.
+    validateFrameBounds(windowGroup.lowerBound, windowGroup.upperBound, windowGroup.isRows);
+    // Lower bound can only be unbounded preceding for now, set to Integer.MIN_VALUE
+    _lowerBound = Integer.MIN_VALUE;
+    // Upper bound can only be unbounded following or current row for now
+    _upperBound = windowGroup.upperBound.isUnbounded() ? Integer.MAX_VALUE : 0;
+    _isRows = windowGroup.isRows;
+
+    // TODO: Constants are used to store constants needed such as the frame literals. For now just save this, need to
+    //       extract the constant values into bounds as a part of frame support.
+    _constants = new ArrayList<>();
+    for (RexLiteral constant : constants) {
+      _constants.add(RexExpression.toRexExpression(constant));
+    }
+  }
+
+  @Override
+  public String explain() {
+    return "WINDOW";
+  }
+
+  @Override
+  public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+    return visitor.visitWindow(this, context);
+  }
+
+  public List<RexExpression> getGroupSet() {
+    return _groupSet;
+  }
+
+  public List<RexExpression> getOrderSet() {
+    return _orderSet;
+  }
+
+  public List<RelFieldCollation.Direction> getOrderSetDirection() {
+    return _orderSetDirection;
+  }
+
+  public List<RelFieldCollation.NullDirection> getOrderSetNullDirection() {
+    return _orderSetNullDirection;
+  }
+
+  public List<RexExpression> getAggCalls() {
+    return _aggCalls;
+  }
+
+  public int getLowerBound() {
+    return _lowerBound;
+  }
+
+  public int getUpperBound() {
+    return _upperBound;
+  }
+
+  public boolean isRows() {
+    return _isRows;
+  }
+
+  public List<RexExpression> getConstants() {
+    return _constants;
+  }
+
+  private void validateFrameBounds(RexWindowBound lowerBound, RexWindowBound upperBound, boolean isRows) {
+    Preconditions.checkState(!isRows, "Only default frame is supported which must be RANGE and not ROWS");
+    Preconditions.checkState(lowerBound.isPreceding() && lowerBound.isUnbounded()
+            && lowerBound.getOffset() == null,
+        String.format("Only default frame is supported, actual lower bound frame provided: %s", lowerBound));
+    if (_orderSet.isEmpty()) {
+      Preconditions.checkState(upperBound.isFollowing() && upperBound.isUnbounded()
+              && upperBound.getOffset() == null,
+          String.format("Only default frame is supported, actual upper bound frame provided: %s", upperBound));
+    } else {
+      Preconditions.checkState(upperBound.isCurrentRow() && upperBound.getOffset() == null,
+          String.format("Only default frame is supported, actual upper bound frame provided: %s", upperBound));
+    }
+  }
+}
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
index 7d761bac4f..3a69d3dceb 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pinot.query;
 
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import java.util.HashMap;
@@ -96,6 +98,17 @@ public class QueryEnvironmentTestBase {
             + " WHERE a.col3 >= 0 GROUP BY a.col2, a.col3"},
         new Object[]{"SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1 WHERE a.col2 IN ('foo', 'bar') AND"
             + " b.col2 NOT IN ('alice', 'charlie')"},
+        new Object[]{"SELECT a.col1, SUM(a.col3) OVER () FROM a"},
+        new Object[]{"SELECT a.col1, SUM(a.col3) OVER (PARTITION BY a.col2) FROM a"},
+        new Object[]{"SELECT a.col1, SUM(a.col3) OVER (PARTITION BY a.col2 ORDER BY a.col2) FROM a"},
+        new Object[]{"SELECT a.col1, AVG(a.col3) OVER (), SUM(a.col3) OVER () FROM a"},
+        new Object[]{"SELECT a.col1, SUM(a.col3) OVER () FROM a WHERE a.col3 >= 0"},
+        new Object[]{"SELECT a.col1, SUM(a.col3) OVER (PARTITION BY a.col2), MIN(a.col3) OVER (PARTITION BY a.col2) "
+            + "FROM a"},
+        new Object[]{"SELECT a.col1, SUM(a.col3) OVER (PARTITION BY a.col2, a.col1) FROM a"},
+        new Object[]{"SELECT a.col1, SUM(a.col3) OVER (ORDER BY a.col2, a.col1), MIN(a.col3) OVER (ORDER BY a.col2, "
+            + "a.col1) FROM a"},
+        new Object[]{"SELECT a.col1, SUM(a.col3) OVER (ORDER BY a.col2), MIN(a.col3) OVER (ORDER BY a.col2) FROM a"},
     };
   }
 
@@ -121,4 +134,43 @@ public class QueryEnvironmentTestBase {
         CalciteSchemaBuilder.asRootSchema(new PinotCatalog(tableCache)),
         new WorkerManager("localhost", reducerPort, routingManager), tableCache);
   }
+
+  /**
+   * JSON test case definition for query planner test cases. Tables and schemas will come from those already defined
+   * and part of the {@code QueryEnvironment} in this base and are not part of the JSON definition for now.
+   */
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  public static class QueryPlanTestCase {
+    // ignores the entire query test case
+    @JsonProperty("ignored")
+    public boolean _ignored;
+    @JsonProperty("queries")
+    public List<Query> _queries;
+
+    @Override
+    public String toString() {
+      return "QueryPlanTestCase{" + "_ignored=" + _ignored + ", _queries=" + _queries + '}';
+    }
+
+    @JsonIgnoreProperties(ignoreUnknown = true)
+    public static class Query {
+      // ignores just a single query test from the test case
+      @JsonProperty("ignored")
+      public boolean _ignored;
+      @JsonProperty("sql")
+      public String _sql;
+      @JsonProperty("description")
+      public String _description;
+      @JsonProperty("output")
+      public List<String> _output = null;
+      @JsonProperty("expectedException")
+      public String _expectedException;
+
+      @Override
+      public String toString() {
+        return "Query{" + "_ignored=" + _ignored + ", _sql='" + _sql + '\'' + ", _description='" + _description + '\''
+            + ", _outputs=" + _output + ", _expectedException='" + _expectedException + '\'' + '}';
+      }
+    }
+  }
 }
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/queries/ResourceBasedQueryPlansTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/queries/ResourceBasedQueryPlansTest.java
new file mode 100644
index 0000000000..50218e1d4c
--- /dev/null
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/queries/ResourceBasedQueryPlansTest.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.queries;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.query.QueryEnvironmentTestBase;
+import org.apache.pinot.query.planner.QueryPlan;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class ResourceBasedQueryPlansTest extends QueryEnvironmentTestBase {
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+  private static final String QUERY_TEST_RESOURCE_FOLDER = "queries";
+  private static final String FILE_FILTER_PROPERTY = "pinot.fileFilter";
+
+  @Test(dataProvider = "testResourceQueryPlannerTestCaseProviderHappyPath")
+  public void testQueryExplainPlansAndQueryPlanConversion(String testCaseName, String query, String output) {
+    try {
+      String explainedPlan = _queryEnvironment.explainQuery(query);
+      Assert.assertEquals(explainedPlan, output,
+          String.format("Test case %s for query %s doesn't match expected output: %s", testCaseName, query, output));
+      String queryWithoutExplainPlan = query.replace("EXPLAIN PLAN FOR ", "");
+      QueryPlan queryPlan = _queryEnvironment.planQuery(queryWithoutExplainPlan);
+      Assert.assertNotNull(queryPlan, String.format("Test case %s for query %s should not have a null QueryPlan",
+          testCaseName, queryWithoutExplainPlan));
+    } catch (Exception e) {
+      Assert.fail("Test case: " + testCaseName + " failed to explain query: " + query, e);
+    }
+  }
+
+  @Test(dataProvider = "testResourceQueryPlannerTestCaseProviderExceptions")
+  public void testQueryExplainPlansWithExceptions(String testCaseName, String query, String expectedException) {
+    try {
+      _queryEnvironment.explainQuery(query);
+      Assert.fail("Query compilation should have failed with exception message pattern: " + expectedException);
+    } catch (Exception e) {
+      if (expectedException == null) {
+        throw e;
+      } else {
+        Pattern pattern = Pattern.compile(expectedException);
+        Assert.assertTrue(pattern.matcher(e.getMessage()).matches(),
+            String.format("Caught exception '%s' for test case '%s', but it did not match the expected pattern '%s'.",
+                e.getMessage(), testCaseName, expectedException));
+      }
+    }
+  }
+
+  @DataProvider
+  private static Object[][] testResourceQueryPlannerTestCaseProviderHappyPath()
+      throws Exception {
+    Map<String, QueryPlanTestCase> testCaseMap = getTestCases();
+    List<Object[]> providerContent = new ArrayList<>();
+    for (Map.Entry<String, QueryPlanTestCase> testCaseEntry : testCaseMap.entrySet()) {
+      String testCaseName = testCaseEntry.getKey();
+      if (testCaseEntry.getValue()._ignored) {
+        continue;
+      }
+
+      List<QueryPlanTestCase.Query> queryCases = testCaseEntry.getValue()._queries;
+      for (QueryPlanTestCase.Query queryCase : queryCases) {
+        if (queryCase._ignored || queryCase._expectedException != null) {
+          continue;
+        }
+
+        if (queryCase._output != null) {
+          String sql = queryCase._sql;
+          List<String> orgOutput = queryCase._output;
+          String concatenatedOutput = StringUtils.join(orgOutput, "");
+          Object[] testEntry = new Object[]{testCaseName, sql, concatenatedOutput};
+          providerContent.add(testEntry);
+        }
+      }
+    }
+    return providerContent.toArray(new Object[][]{});
+  }
+
+  @DataProvider
+  private static Object[][] testResourceQueryPlannerTestCaseProviderExceptions()
+      throws Exception {
+    Map<String, QueryPlanTestCase> testCaseMap = getTestCases();
+      List<Object[]> providerContent = new ArrayList<>();
+        for (Map.Entry<String, QueryPlanTestCase> testCaseEntry : testCaseMap.entrySet()) {
+        String testCaseName = testCaseEntry.getKey();
+        if (testCaseEntry.getValue()._ignored) {
+          continue;
+        }
+
+        List<QueryPlanTestCase.Query> queryCases = testCaseEntry.getValue()._queries;
+        for (QueryPlanTestCase.Query queryCase : queryCases) {
+          if (queryCase._ignored) {
+            continue;
+          }
+
+          if (queryCase._expectedException != null) {
+            String sql = queryCase._sql;
+            String exceptionString = queryCase._expectedException;
+            Object[] testEntry = new Object[]{testCaseName, sql, exceptionString};
+            providerContent.add(testEntry);
+          }
+        }
+      }
+    return providerContent.toArray(new Object[][]{});
+  }
+
+  private static Map<String, QueryPlanTestCase> getTestCases()
+      throws Exception {
+    Map<String, QueryPlanTestCase> testCaseMap = new HashMap<>();
+    ClassLoader classLoader = ResourceBasedQueryPlansTest.class.getClassLoader();
+    // Get all test files.
+    List<String> testFilenames = new ArrayList<>();
+    try (InputStream in = classLoader.getResourceAsStream(QUERY_TEST_RESOURCE_FOLDER);
+        BufferedReader br = new BufferedReader(new InputStreamReader(in))) {
+      String resource;
+      while ((resource = br.readLine()) != null) {
+        testFilenames.add(resource);
+      }
+    }
+
+    // get filter if set
+    String property = System.getProperty(FILE_FILTER_PROPERTY);
+
+    // Load each test file.
+    for (String testCaseName : testFilenames) {
+      if (property != null && !testCaseName.toLowerCase().contains(property.toLowerCase())) {
+        continue;
+      }
+
+      String testCaseFile = QUERY_TEST_RESOURCE_FOLDER + File.separator + testCaseName;
+      URL testFileUrl = classLoader.getResource(testCaseFile);
+      // This test only supports local resource loading (e.g. must be a file), not support JAR test loading.
+      if (testFileUrl != null && new File(testFileUrl.getFile()).exists()) {
+        Map<String, QueryPlanTestCase> testCases = MAPPER.readValue(new File(testFileUrl.getFile()),
+            new TypeReference<Map<String, QueryPlanTestCase>>() { });
+        {
+          HashSet<String> hashSet = new HashSet<>(testCaseMap.keySet());
+          hashSet.retainAll(testCases.keySet());
+          if (!hashSet.isEmpty()) {
+            throw new IllegalArgumentException("testCase already exist for the following names: " + hashSet);
+          }
+        }
+        testCaseMap.putAll(testCases);
+      }
+    }
+    return testCaseMap;
+  }
+}
diff --git a/pinot-query-planner/src/test/resources/queries/AggregatePlans.json b/pinot-query-planner/src/test/resources/queries/AggregatePlans.json
new file mode 100644
index 0000000000..64a767763b
--- /dev/null
+++ b/pinot-query-planner/src/test/resources/queries/AggregatePlans.json
@@ -0,0 +1,49 @@
+{
+  "aggregates_planning_tests": {
+    "queries": [
+      {
+        "description": "Select aggregates",
+        "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3), COUNT(a.col1) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(EXPR$0=[CASE(=($1, 0), null:INTEGER, $0)], EXPR$1=[$1])",
+          "\n  LogicalAggregate(group=[{}], EXPR$0=[$SUM0($0)], agg#1=[$SUM0($1)])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalAggregate(group=[{}], EXPR$0=[$SUM0($1)], agg#1=[COUNT()])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Select aggregates with filters",
+        "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3), COUNT(*) FROM a WHERE a.col3 >= 0 AND a.col2 = 'a'",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(EXPR$0=[CASE(=($1, 0), null:INTEGER, $0)], EXPR$1=[$1])",
+          "\n  LogicalAggregate(group=[{}], EXPR$0=[$SUM0($0)], agg#1=[$SUM0($1)])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalAggregate(group=[{}], EXPR$0=[$SUM0($1)], agg#1=[COUNT()])",
+          "\n        LogicalProject(col2=[$0], col3=[$1])",
+          "\n          LogicalFilter(condition=[AND(>=($1, 0), =($0, 'a'))])",
+          "\n            LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Select aggregates with filters and select alias",
+        "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) as sum, COUNT(*) as count FROM a WHERE a.col3 >= 0 AND a.col2 = 'pink floyd'",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(sum=[CASE(=($1, 0), null:INTEGER, $0)], count=[$1])",
+          "\n  LogicalAggregate(group=[{}], sum=[$SUM0($0)], agg#1=[$SUM0($1)])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalAggregate(group=[{}], sum=[$SUM0($1)], agg#1=[COUNT()])",
+          "\n        LogicalProject(col2=[$0], col3=[$1])",
+          "\n          LogicalFilter(condition=[AND(>=($1, 0), =($0, 'pink floyd'))])",
+          "\n            LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      }
+    ]
+  }
+}
\ No newline at end of file
diff --git a/pinot-query-planner/src/test/resources/queries/BasicQueryPlans.json b/pinot-query-planner/src/test/resources/queries/BasicQueryPlans.json
new file mode 100644
index 0000000000..bcca46bff7
--- /dev/null
+++ b/pinot-query-planner/src/test/resources/queries/BasicQueryPlans.json
@@ -0,0 +1,63 @@
+{
+  "basic_query_planning_tests": {
+    "queries": [
+      {
+        "description": "Select *",
+        "sql": "EXPLAIN PLAN FOR SELECT * FROM d",
+        "output": [
+          "Execution Plan",
+          "\nLogicalTableScan(table=[[d]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Select with filters",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, a.col3 + a.ts FROM a WHERE a.col3 >= 0 AND a.col2 = 'a'",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$2], EXPR$1=[+($1, $3)])",
+          "\n  LogicalFilter(condition=[AND(>=($1, 0), =($0, 'a'))])",
+          "\n    LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Select with filters and select alias",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, a.col3 + a.ts AS colsum FROM a WHERE a.col3 >= 0 AND a.col2 = 'a'",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$2], colsum=[+($1, $3)])",
+          "\n  LogicalFilter(condition=[AND(>=($1, 0), =($0, 'a'))])",
+          "\n    LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Select with transform",
+        "sql": "EXPLAIN PLAN FOR SELECT dateTrunc('DAY', ts) FROM a LIMIT 10",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(EXPR$0=[dateTrunc('DAY', $3)])",
+          "\n  LogicalSort(offset=[0], fetch=[10])",
+          "\n    LogicalSortExchange(distribution=[hash], collation=[[]])",
+          "\n      LogicalSort(fetch=[10])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Select with transform and select alias",
+        "sql": "EXPLAIN PLAN FOR SELECT dateTrunc('DAY', ts) AS day FROM a LIMIT 10",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(day=[dateTrunc('DAY', $3)])",
+          "\n  LogicalSort(offset=[0], fetch=[10])",
+          "\n    LogicalSortExchange(distribution=[hash], collation=[[]])",
+          "\n      LogicalSort(fetch=[10])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      }
+    ]
+  }
+}
\ No newline at end of file
diff --git a/pinot-query-planner/src/test/resources/queries/GroupByPlans.json b/pinot-query-planner/src/test/resources/queries/GroupByPlans.json
new file mode 100644
index 0000000000..9cadbb71f4
--- /dev/null
+++ b/pinot-query-planner/src/test/resources/queries/GroupByPlans.json
@@ -0,0 +1,97 @@
+{
+  "group_by_planning_tests": {
+    "queries": [
+      {
+        "description": "Group by with select and aggregate column",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) FROM a GROUP BY a.col1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
+          "\n  LogicalExchange(distribution=[hash[0]])",
+          "\n    LogicalAggregate(group=[{2}], EXPR$1=[$SUM0($1)])",
+          "\n      LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Group by with filter",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) FROM a WHERE a.col3 >= 0 AND a.col2 = 'a' GROUP BY a.col1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
+          "\n  LogicalExchange(distribution=[hash[0]])",
+          "\n    LogicalAggregate(group=[{2}], EXPR$1=[$SUM0($1)])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalFilter(condition=[AND(>=($1, 0), =($0, 'a'))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Group by count(*) with filter",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, COUNT(*) FROM a WHERE a.col3 >= 0 AND a.col2 = 'a' GROUP BY a.col1",
+        "notes": "TODO: Needs follow up. Project should only keep a.col1 since the other columns are pushed to the filter, but it currently keeps them all",
+        "output": [
+          "Execution Plan",
+          "\nLogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
+          "\n  LogicalExchange(distribution=[hash[0]])",
+          "\n    LogicalAggregate(group=[{2}], EXPR$1=[COUNT()])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalFilter(condition=[AND(>=($1, 0), =($0, 'a'))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Group by on 2 columns with filter",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col2, a.col1, SUM(a.col3) FROM a WHERE a.col3 >= 0 AND a.col1 = 'a'  GROUP BY a.col1, a.col2",
+        "output": [
+          "Execution Plan",
+          "\nLogicalAggregate(group=[{0, 1}], EXPR$2=[$SUM0($2)])",
+          "\n  LogicalExchange(distribution=[hash[0, 1]])",
+          "\n    LogicalAggregate(group=[{0, 2}], EXPR$2=[$SUM0($1)])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalFilter(condition=[AND(>=($1, 0), =($2, 'a'))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Group by with having clause",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, COUNT(*), SUM(a.col3) FROM a WHERE a.col3 >= 0 AND a.col2 = 'a' GROUP BY a.col1 HAVING COUNT(*) > 10 AND MAX(a.col3) >= 0 AND MIN(a.col3) < 20 AND SUM(a.col3) <= 10 AND AVG(a.col3) = 5",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$0], EXPR$1=[$1], EXPR$2=[$2])",
+          "\n  LogicalFilter(condition=[AND(>($1, 10), >=($3, 0), <($4, 20), <=($2, 10), =($5, 5))])",
+          "\n    LogicalProject(col1=[$0], EXPR$1=[$1], EXPR$2=[$2], $f3=[$3], $f4=[$4], $f5=[CAST(/($5, $1)):INTEGER NOT NULL])",
+          "\n      LogicalProject(col1=[$0], EXPR$1=[$1], EXPR$2=[$2], $f3=[$3], $f4=[$4], $f5=[$2])",
+          "\n        LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)], EXPR$2=[$SUM0($2)], agg#2=[MAX($3)], agg#3=[MIN($4)])",
+          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n            LogicalAggregate(group=[{2}], EXPR$1=[COUNT()], EXPR$2=[$SUM0($1)], agg#2=[MAX($1)], agg#3=[MIN($1)])",
+          "\n              LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n                LogicalFilter(condition=[AND(>=($1, 0), =($0, 'a'))])",
+          "\n                  LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Group by with having clause and select alias",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1 as value1, COUNT(*) AS count, SUM(a.col3) as SUM FROM a WHERE a.col3 >= 0 AND a.col2 = 'a' GROUP BY a.col1 HAVING COUNT(*) > 10 AND MAX(a.col3) >= 0 AND MIN(a.col3) < 20 AND SUM(a.col3) <= 10 AND AVG(a.col3) = 5",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(value1=[$0], count=[$1], SUM=[$2])",
+          "\n  LogicalFilter(condition=[AND(>($1, 10), >=($3, 0), <($4, 20), <=($2, 10), =($5, 5))])",
+          "\n    LogicalProject(col1=[$0], count=[$1], SUM=[$2], $f3=[$3], $f4=[$4], $f5=[CAST(/($5, $1)):INTEGER NOT NULL])",
+          "\n      LogicalProject(col1=[$0], count=[$1], SUM=[$2], $f3=[$3], $f4=[$4], $f5=[$2])",
+          "\n        LogicalAggregate(group=[{0}], count=[$SUM0($1)], SUM=[$SUM0($2)], agg#2=[MAX($3)], agg#3=[MIN($4)])",
+          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n            LogicalAggregate(group=[{2}], count=[COUNT()], SUM=[$SUM0($1)], agg#2=[MAX($1)], agg#3=[MIN($1)])",
+          "\n              LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n                LogicalFilter(condition=[AND(>=($1, 0), =($0, 'a'))])",
+          "\n                  LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      }
+    ]
+  }
+}
\ No newline at end of file
diff --git a/pinot-query-planner/src/test/resources/queries/JoinPlans.json b/pinot-query-planner/src/test/resources/queries/JoinPlans.json
new file mode 100644
index 0000000000..c3dc7a99f2
--- /dev/null
+++ b/pinot-query-planner/src/test/resources/queries/JoinPlans.json
@@ -0,0 +1,236 @@
+{
+  "join_planning_tests": {
+    "queries": [
+      {
+        "description": "Inner join with order by",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, a.ts, b.col3 FROM a JOIN b ON a.col1 = b.col2 ORDER BY a.col1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$0], ts=[$1], col3=[$3])",
+          "\n  LogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
+          "\n    LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n      LogicalSort(sort0=[$0], dir0=[ASC])",
+          "\n        LogicalJoin(condition=[=($0, $2)], joinType=[inner])",
+          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n            LogicalProject(col1=[$2], ts=[$3])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n            LogicalProject(col2=[$0], col3=[$1])",
+          "\n              LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Inner join with order by and select column with alias",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1 AS value1, a.ts AS ts1, b.col3 FROM a JOIN b ON a.col1 = b.col2 ORDER BY a.col1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(value1=[$0], ts1=[$1], col3=[$3])",
+          "\n  LogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
+          "\n    LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n      LogicalSort(sort0=[$0], dir0=[ASC])",
+          "\n        LogicalJoin(condition=[=($0, $2)], joinType=[inner])",
+          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n            LogicalProject(col1=[$2], ts=[$3])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n            LogicalProject(col2=[$0], col3=[$1])",
+          "\n              LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "SELECT * inner join",
+        "sql": "EXPLAIN PLAN FOR SELECT * FROM a JOIN b ON a.col1 = b.col2",
+        "output": [
+          "Execution Plan",
+          "\nLogicalJoin(condition=[=($2, $4)], joinType=[inner])",
+          "\n  LogicalExchange(distribution=[hash[2]])",
+          "\n    LogicalTableScan(table=[[a]])",
+          "\n  LogicalExchange(distribution=[hash[0]])",
+          "\n    LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "SELECT * inner join with filter on one table",
+        "sql": "EXPLAIN PLAN FOR SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0",
+        "output": [
+          "Execution Plan",
+          "\nLogicalJoin(condition=[=($2, $4)], joinType=[inner])",
+          "\n  LogicalExchange(distribution=[hash[2]])",
+          "\n    LogicalFilter(condition=[>=($1, 0)])",
+          "\n      LogicalTableScan(table=[[a]])",
+          "\n  LogicalExchange(distribution=[hash[0]])",
+          "\n    LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "SELECT * inner join with filter",
+        "sql": "EXPLAIN PLAN FOR SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0 AND a.col3 > b.col3",
+        "output": [
+          "Execution Plan",
+          "\nLogicalJoin(condition=[AND(=($2, $4), >($1, $5))], joinType=[inner])",
+          "\n  LogicalExchange(distribution=[hash[2]])",
+          "\n    LogicalFilter(condition=[>=($1, 0)])",
+          "\n      LogicalTableScan(table=[[a]])",
+          "\n  LogicalExchange(distribution=[hash[0]])",
+          "\n    LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "SELECT * inner join on 2 columns equality",
+        "sql": "EXPLAIN PLAN FOR SELECT * FROM a JOIN b on a.col1 = b.col1 AND a.col2 = b.col2",
+        "output": [
+          "Execution Plan",
+          "\nLogicalJoin(condition=[AND(=($2, $6), =($0, $4))], joinType=[inner])",
+          "\n  LogicalExchange(distribution=[hash[0, 2]])",
+          "\n    LogicalTableScan(table=[[a]])",
+          "\n  LogicalExchange(distribution=[hash[0, 2]])",
+          "\n    LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Inner join with filter on both tables",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, a.ts, b.col3 FROM a JOIN b ON a.col1 = b.col2  WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$0], ts=[$1], col3=[$3])",
+          "\n  LogicalJoin(condition=[=($0, $2)], joinType=[inner])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col1=[$2], ts=[$3])",
+          "\n        LogicalFilter(condition=[AND(>=($1, 0), =($0, 'a'))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1])",
+          "\n        LogicalFilter(condition=[<($1, 0)])",
+          "\n          LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Inner join with group by",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(b.col3) FROM a JOIN b ON a.col1 = b.col2  WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0 GROUP BY a.col1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$0], EXPR$1=[CAST(/($1, $2)):INTEGER NOT NULL])",
+          "\n  LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[$SUM0($2)])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalAggregate(group=[{0}], agg#0=[$SUM0($2)], agg#1=[COUNT()])",
+          "\n        LogicalJoin(condition=[=($0, $1)], joinType=[inner])",
+          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n            LogicalProject(col1=[$2])",
+          "\n              LogicalFilter(condition=[AND(>=($1, 0), =($0, 'a'))])",
+          "\n                LogicalTableScan(table=[[a]])",
+          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n            LogicalProject(col2=[$0], col3=[$1])",
+          "\n              LogicalFilter(condition=[<($1, 0)])",
+          "\n                LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Inner join with datetrunc transform",
+        "sql": "EXPLAIN PLAN FOR SELECT dateTrunc('DAY', a.ts + b.ts) FROM a JOIN b on a.col1 = b.col1 AND a.col2 = b.col2",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(EXPR$0=[dateTrunc('DAY', +($2, $5))])",
+          "\n  LogicalJoin(condition=[AND(=($1, $4), =($0, $3))], joinType=[inner])",
+          "\n    LogicalExchange(distribution=[hash[0, 1]])",
+          "\n      LogicalProject(col2=[$0], col1=[$2], ts=[$3])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n    LogicalExchange(distribution=[hash[0, 1]])",
+          "\n      LogicalProject(col2=[$0], col1=[$2], ts=[$3])",
+          "\n        LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Inner join with group by on 2 columns",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col2, a.col3 FROM a JOIN b ON a.col1 = b.col1 WHERE a.col3 >= 0 GROUP BY a.col2, a.col3",
+        "output": [
+          "Execution Plan",
+          "\nLogicalAggregate(group=[{0, 1}])",
+          "\n  LogicalExchange(distribution=[hash[0, 1]])",
+          "\n    LogicalAggregate(group=[{0, 1}])",
+          "\n      LogicalJoin(condition=[=($2, $3)], joinType=[inner])",
+          "\n        LogicalExchange(distribution=[hash[2]])",
+          "\n          LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n            LogicalFilter(condition=[>=($1, 0)])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n        LogicalExchange(distribution=[hash[0]])",
+          "\n          LogicalProject(col1=[$2])",
+          "\n            LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Inner join with group by on 2 columns with select alias",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col2, a.col3 as value3 FROM a JOIN b ON a.col1 = b.col1 WHERE a.col3 >= 0 GROUP BY a.col2, a.col3",
+        "output": [
+          "Execution Plan",
+          "\nLogicalAggregate(group=[{0, 1}])",
+          "\n  LogicalExchange(distribution=[hash[0, 1]])",
+          "\n    LogicalAggregate(group=[{0, 1}])",
+          "\n      LogicalJoin(condition=[=($2, $3)], joinType=[inner])",
+          "\n        LogicalExchange(distribution=[hash[2]])",
+          "\n          LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n            LogicalFilter(condition=[>=($1, 0)])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n        LogicalExchange(distribution=[hash[0]])",
+          "\n          LogicalProject(col1=[$2])",
+          "\n            LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Inner join with filters using IN clause",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1 WHERE a.col2 IN ('foo', 'bar') AND b.col2 NOT IN ('alice', 'charlie')",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$0], col2=[$1])",
+          "\n  LogicalJoin(condition=[=($0, $2)], joinType=[inner])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col1=[$2])",
+          "\n        LogicalFilter(condition=[OR(=($0, 'bar'), =($0, 'foo'))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n      LogicalProject(col2=[$0], col1=[$2])",
+          "\n        LogicalFilter(condition=[AND(<>($0, 'alice':VARCHAR(7)), <>($0, 'charlie':VARCHAR(7)))])",
+          "\n          LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Inner join with filters using IN clause with select alias",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, b.col2 as bvalue2 FROM a JOIN b ON a.col1 = b.col1 WHERE a.col2 IN ('foo', 'bar') AND b.col2 NOT IN ('alice', 'charlie')",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$0], bvalue2=[$1])",
+          "\n  LogicalJoin(condition=[=($0, $2)], joinType=[inner])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col1=[$2])",
+          "\n        LogicalFilter(condition=[OR(=($0, 'bar'), =($0, 'foo'))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n      LogicalProject(col2=[$0], col1=[$2])",
+          "\n        LogicalFilter(condition=[AND(<>($0, 'alice':VARCHAR(7)), <>($0, 'charlie':VARCHAR(7)))])",
+          "\n          LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      }
+    ]
+  },
+  "exception_throwing_join_planning_tests": {
+    "queries": [
+      {
+        "description": "Incorrect table",
+        "sql": "EXPLAIN PLAN FOR SELECT b.col1 - a.col3 FROM a JOIN c ON a.col1 = c.col3",
+        "expectedException": "Error explain query plan for.*"
+      }
+    ]
+  }
+}
\ No newline at end of file
diff --git a/pinot-query-planner/src/test/resources/queries/OrderByPlans.json b/pinot-query-planner/src/test/resources/queries/OrderByPlans.json
new file mode 100644
index 0000000000..94f8fe9ba3
--- /dev/null
+++ b/pinot-query-planner/src/test/resources/queries/OrderByPlans.json
@@ -0,0 +1,86 @@
+{
+  "order_by_planning_tests": {
+    "queries": [
+      {
+        "description": "Simple Order by",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1 FROM a ORDER BY a.col1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$2])",
+          "\n  LogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
+          "\n    LogicalSortExchange(distribution=[hash], collation=[[2]])",
+          "\n      LogicalSort(sort0=[$2], dir0=[ASC])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Simple Order by with alias on select column",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1 AS value1 FROM a ORDER BY a.col1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(value1=[$2])",
+          "\n  LogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
+          "\n    LogicalSortExchange(distribution=[hash], collation=[[2]])",
+          "\n      LogicalSort(sort0=[$2], dir0=[ASC])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Select * order by",
+        "sql": "EXPLAIN PLAN FOR SELECT * FROM a ORDER BY col1 LIMIT 10",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0], fetch=[10])",
+          "\n  LogicalSortExchange(distribution=[hash], collation=[[2]])",
+          "\n    LogicalSort(sort0=[$2], dir0=[ASC], fetch=[10])",
+          "\n      LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Select * order by on 2 columns with descending",
+        "sql": "EXPLAIN PLAN FOR SELECT * FROM b ORDER BY col1, col2 DESC LIMIT 10",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$2], sort1=[$0], dir0=[ASC], dir1=[DESC], offset=[0], fetch=[10])",
+          "\n  LogicalSortExchange(distribution=[hash], collation=[[2, 0 DESC]])",
+          "\n    LogicalSort(sort0=[$2], sort1=[$0], dir0=[ASC], dir1=[DESC], fetch=[10])",
+          "\n      LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Order by and group by",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) FROM a GROUP BY a.col1 ORDER BY a.col1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
+          "\n  LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
+          "\n      LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
+          "\n        LogicalExchange(distribution=[hash[0]])",
+          "\n          LogicalAggregate(group=[{2}], EXPR$1=[$SUM0($1)])",
+          "\n            LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Order by and group by with alias",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1 AS value1, SUM(a.col3) AS sum FROM a GROUP BY a.col1 ORDER BY a.col1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
+          "\n  LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
+          "\n      LogicalAggregate(group=[{0}], sum=[$SUM0($1)])",
+          "\n        LogicalExchange(distribution=[hash[0]])",
+          "\n          LogicalAggregate(group=[{2}], sum=[$SUM0($1)])",
+          "\n            LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      }
+    ]
+  }
+}
\ No newline at end of file
diff --git a/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json b/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
new file mode 100644
index 0000000000..2199542368
--- /dev/null
+++ b/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
@@ -0,0 +1,1488 @@
+{
+  "window_function_planning_tests": {
+    "queries": [
+      {
+        "description": "single empty OVER() only",
+        "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER() FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$1])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col3=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single empty OVER() and select col",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER() FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$1], $1=[$2])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single empty OVER() and select col with select alias",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1 AS value1, SUM(a.col3) OVER() AS sum FROM a",
+        "notes": "TODO: Look into why aliases are getting ignored in the final plan",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$1], $1=[$2])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single empty OVER() with default frame",
+        "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM a",
+        "notes": "Calcite validation fails if RANGE is used but later Calcite overrides ROWS with RANGE",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$1])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col3=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single empty OVER() and select col with global order by",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER() FROM a ORDER BY a.col2",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
+          "\n  LogicalSortExchange(distribution=[hash], collation=[[2]])",
+          "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
+          "\n      LogicalProject(col1=[$2], EXPR$1=[$3], col2=[$0])",
+          "\n        LogicalWindow(window#0=[window(aggs [SUM($1)])])",
+          "\n          LogicalExchange(distribution=[hash])",
+          "\n            LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single empty OVER() and transform col",
+        "sql": "EXPLAIN PLAN FOR SELECT CONCAT(a.col1, '-', a.col2), SUM(a.col3) OVER() FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$1], $1=[$2])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col3=[$1], $1=[CONCAT($2, '-', $0)])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single empty OVER() select col and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(a.col3) OVER() FROM a WHERE a.col3 > 10",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$1], EXPR$1=[CAST(/($2, $3)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($0), COUNT($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col3=[$1], col1=[$2])",
+          "\n        LogicalFilter(condition=[>($1, 10)])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single empty OVER() with select transform and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT CONCAT(a.col1, '-', a.col2), MIN(a.col3) OVER() FROM a where a.col1 IN ('foo', 'bar')",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$1], $1=[$2])",
+          "\n  LogicalWindow(window#0=[window(aggs [MIN($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col3=[$1], $1=[CONCAT($2, '-', $0)])",
+          "\n        LogicalFilter(condition=[OR(=($2, 'bar'), =($2, 'foo'))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple empty OVER()s only",
+        "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER(), COUNT(a.col2) OVER() FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2], $1=[$3])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($1), COUNT($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col2=[$0], col3=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple empty OVER()s and select col",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER(), COUNT(a.col2) OVER() FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$2], $1=[$3], $2=[$4])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($1), COUNT($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple empty OVER()s and select col with select alias",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1 AS value1, SUM(a.col3) OVER() AS sum, COUNT(a.col2) OVER() AS count FROM a",
+        "notes": "TODO: Look into why aliases are getting ignored in the final plan",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$2], $1=[$3], $2=[$4])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($1), COUNT($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple empty OVER()s with default frame on one but not the other",
+        "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), MIN(a.col3) OVER() FROM a",
+        "notes": "Calcite validation fails if RANGE is used but later Calcite overrides ROWS with RANGE",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$1], $1=[$2])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($0), MIN($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col3=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple empty OVER()s and select col with global order by",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER(), COUNT(a.col2) OVER() FROM a ORDER BY a.col1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
+          "\n  LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
+          "\n      LogicalProject(col1=[$2], $1=[$3], $2=[$4])",
+          "\n        LogicalWindow(window#0=[window(aggs [SUM($1), COUNT($0)])])",
+          "\n          LogicalExchange(distribution=[hash])",
+          "\n            LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple empty OVER()s and transform col",
+        "sql": "EXPLAIN PLAN FOR SELECT CONCAT(a.col1, '-', a.col2), SUM(a.col3) OVER(), MAX(a.col3) OVER() FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$1], $1=[$2], $2=[$3])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($0), MAX($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col3=[$1], $1=[CONCAT($2, '-', $0)])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple empty OVER()s select col and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(a.col3) OVER(), COUNT(a.col1) OVER() FROM a WHERE a.col3 > 100",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$1], EXPR$1=[CAST(/($2, $3)):INTEGER NOT NULL], EXPR$2=[$4])",
+          "\n  LogicalWindow(window#0=[window(aggs [SUM($0), COUNT($0), COUNT($1)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col3=[$1], col1=[$2])",
+          "\n        LogicalFilter(condition=[>($1, 100)])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple empty OVER()s with select transform and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT LENGTH(CONCAT(a.col1, ' ', a.col2)), MIN(a.col3) OVER(), MAX(a.col3) OVER() FROM a where a.col1 NOT IN ('foo', 'bar', 'baz')",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$1], $1=[$2], $2=[$3])",
+          "\n  LogicalWindow(window#0=[window(aggs [MIN($0), MAX($0)])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject(col3=[$1], $1=[LENGTH(CONCAT($2, ' ', $0))])",
+          "\n        LogicalFilter(condition=[AND(<>($2, 'bar'), <>($2, 'baz'), <>($2, 'foo'))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY) only",
+        "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER(PARTITION BY a.col2) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2])",
+          "\n  LogicalWindow(window#0=[window(partition {0} aggs [SUM($1)])])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY) only with alias",
+        "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER(PARTITION BY a.col2) AS sum FROM a",
+        "notes": "TODO: Look into why aliases are getting ignored in the final plan",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2])",
+          "\n  LogicalWindow(window#0=[window(partition {0} aggs [SUM($1)])])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY) and select col",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(a.col3) OVER(PARTITION BY a.col2) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$2], EXPR$1=[CAST(/($3, $4)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(partition {0} aggs [SUM($1), COUNT($1)])])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY) and select col with select alias",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1 AS value1, AVG(a.col3) OVER(PARTITION BY a.col2) AS avg FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(value1=[$2], avg=[CAST(/($3, $4)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(partition {0} aggs [SUM($1), COUNT($1)])])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY) with default frame",
+        "sql": "EXPLAIN PLAN FOR SELECT MAX(a.col3) OVER(PARTITION BY a.col1 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM a",
+        "notes": "Calcite validation fails if RANGE is used but later Calcite overrides ROWS with RANGE",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2])",
+          "\n  LogicalWindow(window#0=[window(partition {1} aggs [MAX($0)])])",
+          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n      LogicalProject(col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY) and select col with global order by",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col2, MIN(a.col3) OVER(PARTITION BY a.col1) FROM a ORDER BY a.col1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
+          "\n  LogicalSortExchange(distribution=[hash], collation=[[2]])",
+          "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
+          "\n      LogicalProject(col2=[$0], EXPR$1=[$3], col1=[$2])",
+          "\n        LogicalWindow(window#0=[window(partition {2} aggs [MIN($1)])])",
+          "\n          LogicalExchange(distribution=[hash[2]])",
+          "\n            LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY) and transform col",
+        "sql": "EXPLAIN PLAN FOR SELECT SUBSTR(a.col1, 0, 2), COUNT(a.col2) OVER(PARTITION BY a.col3) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2], $1=[$3])",
+          "\n  LogicalWindow(window#0=[window(partition {1} aggs [COUNT($0)])])",
+          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], $2=[SUBSTR($2, 0, 2)])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY) select col and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col2, AVG(a.col3) OVER(PARTITION BY a.col2) FROM a WHERE a.col3 > 10 AND a.col3 <= 500",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col2=[$0], EXPR$1=[CAST(/($2, $3)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(partition {0} aggs [SUM($1), COUNT($1)])])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1])",
+          "\n        LogicalFilter(condition=[AND(>($1, 10), <=($1, 500))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY) with select transform and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT CONCAT(a.col1, '-', a.col2), AVG(a.col3) OVER(PARTITION BY a.col2) FROM a where a.col1 NOT IN ('foo', 'bar') OR a.col3 >= 42",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(EXPR$0=[$2], EXPR$1=[CAST(/($3, $4)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(partition {0} aggs [SUM($1), COUNT($1)])])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], $2=[CONCAT($2, '-', $0)])",
+          "\n        LogicalFilter(condition=[OR(AND(<>($2, 'bar'), <>($2, 'foo')), >=($1, 42))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY) with transform on partition key",
+        "sql": "EXPLAIN PLAN FOR SELECT AVG(a.col3) OVER(PARTITION BY CONCAT(a.col1, '-', a.col2)) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(EXPR$0=[CAST(/($2, $3)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(partition {1} aggs [SUM($0), COUNT($0)])])",
+          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n      LogicalProject(col3=[$1], $1=[CONCAT($2, '-', $0)])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY)s on the same key only",
+        "sql": "EXPLAIN PLAN FOR SELECT MAX(a.col3) OVER(PARTITION BY a.col1), COUNT(a.col2) OVER(PARTITION BY a.col1) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$3], $1=[$4])",
+          "\n  LogicalWindow(window#0=[window(partition {2} aggs [MAX($1), COUNT($0)])])",
+          "\n    LogicalExchange(distribution=[hash[2]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY)s on the same key only with select alias",
+        "sql": "EXPLAIN PLAN FOR SELECT MAX(a.col3) OVER(PARTITION BY a.col1) AS max, COUNT(a.col2) OVER(PARTITION BY a.col1) AS count FROM a",
+        "notes": "TODO: Look into why aliases are getting ignored in the final plan",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$3], $1=[$4])",
+          "\n  LogicalWindow(window#0=[window(partition {2} aggs [MAX($1), COUNT($0)])])",
+          "\n    LogicalExchange(distribution=[hash[2]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY)s on the same key and select col",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(a.col3) OVER(PARTITION BY a.col1), MIN(a.col3) OVER(PARTITION BY a.col1) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$1], EXPR$1=[CAST(/($2, $3)):INTEGER NOT NULL], EXPR$2=[$4])",
+          "\n  LogicalWindow(window#0=[window(partition {1} aggs [SUM($0), COUNT($0), MIN($0)])])",
+          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n      LogicalProject(col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY)s on the same key and select col with select alias",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1 AS value1, AVG(a.col3) OVER(PARTITION BY a.col1) AS avg, MIN(a.col3) OVER(PARTITION BY a.col1) AS min FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(value1=[$1], avg=[CAST(/($2, $3)):INTEGER NOT NULL], min=[$4])",
+          "\n  LogicalWindow(window#0=[window(partition {1} aggs [SUM($0), COUNT($0), MIN($0)])])",
+          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n      LogicalProject(col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY)s on the same key with default frame for one and not the other",
+        "sql": "EXPLAIN PLAN FOR SELECT COUNT(a.col3) OVER(PARTITION BY a.col2 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), MIN(a.col3) OVER(PARTITION BY a.col2) FROM a",
+        "notes": "Calcite validation fails if RANGE is used but later Calcite overrides ROWS with RANGE",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2], $1=[$3])",
+          "\n  LogicalWindow(window#0=[window(partition {0} aggs [COUNT($1), MIN($1)])])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY)s on the same key and select col with global order by on select column (non-avg agg)",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER(PARTITION BY a.col2, a.col1), MAX(a.col3) OVER(PARTITION BY a.col2, a.col1) FROM a ORDER BY a.col1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
+          "\n  LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
+          "\n      LogicalProject(col1=[$2], $1=[$3], $2=[$4])",
+          "\n        LogicalWindow(window#0=[window(partition {0, 2} aggs [SUM($1), MAX($1)])])",
+          "\n          LogicalExchange(distribution=[hash[0, 2]])",
+          "\n            LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY)s on the same key but in reverse order and select col with global order by (non-avg agg)",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER(PARTITION BY a.col2, a.col1), MAX(a.col3) OVER(PARTITION BY a.col1, a.col2) FROM a ORDER BY a.col1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
+          "\n  LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
+          "\n      LogicalProject(col1=[$2], $1=[$3], $2=[$4])",
+          "\n        LogicalWindow(window#0=[window(partition {0, 2} aggs [SUM($1), MAX($1)])])",
+          "\n          LogicalExchange(distribution=[hash[0, 2]])",
+          "\n            LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY)s on the same key and select col with global order by on select column (avg agg)",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER(PARTITION BY a.col2, a.col1), AVG(a.col3) OVER(PARTITION BY a.col2, a.col1) FROM a ORDER BY a.col1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
+          "\n  LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
+          "\n      LogicalProject(col1=[$2], EXPR$1=[$3], EXPR$2=[CAST(/($3, $4)):INTEGER NOT NULL])",
+          "\n        LogicalWindow(window#0=[window(partition {0, 2} aggs [SUM($1), COUNT($1)])])",
+          "\n          LogicalExchange(distribution=[hash[0, 2]])",
+          "\n            LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY)s on the same key but in reverse order and select col with global order by (avg agg)",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER(PARTITION BY a.col2, a.col1), AVG(a.col3) OVER(PARTITION BY a.col1, a.col2) FROM a ORDER BY a.col1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
+          "\n  LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
+          "\n      LogicalProject(col1=[$2], EXPR$1=[$3], EXPR$2=[CAST(/($4, $5)):INTEGER NOT NULL])",
+          "\n        LogicalWindow(window#0=[window(partition {0, 2} aggs [SUM($1), SUM($1), COUNT($1)])])",
+          "\n          LogicalExchange(distribution=[hash[0, 2]])",
+          "\n            LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY)s on the same key and select col with global order by on non-select column",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER(PARTITION BY a.col2, a.col1), AVG(a.col3) OVER(PARTITION BY a.col2, a.col1) FROM a ORDER BY a.col2",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$3], dir0=[ASC], offset=[0])",
+          "\n  LogicalSortExchange(distribution=[hash], collation=[[3]])",
+          "\n    LogicalSort(sort0=[$3], dir0=[ASC])",
+          "\n      LogicalProject(col1=[$2], EXPR$1=[$3], EXPR$2=[CAST(/($3, $4)):INTEGER NOT NULL], col2=[$0])",
+          "\n        LogicalWindow(window#0=[window(partition {0, 2} aggs [SUM($1), COUNT($1)])])",
+          "\n          LogicalExchange(distribution=[hash[0, 2]])",
+          "\n            LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY)s on the same key and transform col",
+        "sql": "EXPLAIN PLAN FOR SELECT REVERSE(a.col1), SUM(a.col3) OVER(PARTITION BY a.col2), MAX(a.col3) OVER(PARTITION BY a.col2) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(EXPR$0=[REVERSE($2)], EXPR$1=[$3], EXPR$2=[$4])",
+          "\n  LogicalWindow(window#0=[window(partition {0} aggs [SUM($1), MAX($1)])])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY)s on the same key select col and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(a.col3) OVER(PARTITION BY a.col1), COUNT(a.col1) OVER(PARTITION BY a.col1) FROM a WHERE a.col3 > 42 AND a.col1 IN ('vader', 'chewbacca', 'yoda')",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$1], EXPR$1=[CAST(/($2, $3)):INTEGER NOT NULL], EXPR$2=[$4])",
+          "\n  LogicalWindow(window#0=[window(partition {1} aggs [SUM($0), COUNT($0), COUNT($1)])])",
+          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n      LogicalProject(col3=[$1], col1=[$2])",
+          "\n        LogicalFilter(condition=[AND(>($1, 42), OR(=($2, 'chewbacca':VARCHAR(9)), =($2, 'vader':VARCHAR(9)), =($2, 'yoda':VARCHAR(9))))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY)s on the same key with select transform and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT REVERSE(CONCAT(a.col1, ' ', a.col2)), MIN(a.col3) OVER(PARTITION BY a.col1), MAX(a.col3) OVER(PARTITION BY a.col1) FROM a where a.col2 NOT IN ('foo', 'bar', 'baz')",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2], $1=[$3], $2=[$4])",
+          "\n  LogicalWindow(window#0=[window(partition {1} aggs [MIN($0), MAX($0)])])",
+          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n      LogicalProject(col3=[$1], col1=[$2], $2=[REVERSE(CONCAT($2, ' ', $0))])",
+          "\n        LogicalFilter(condition=[AND(<>($0, 'bar'), <>($0, 'baz'), <>($0, 'foo'))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY) with transform on partition key",
+        "sql": "EXPLAIN PLAN FOR SELECT AVG(a.col3) OVER(PARTITION BY REVERSE(CONCAT(a.col1, '-', a.col2))), COUNT(a.col1) OVER(PARTITION BY REVERSE(CONCAT(a.col1, '-', a.col2))) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(EXPR$0=[CAST(/($3, $4)):INTEGER NOT NULL], EXPR$1=[$5])",
+          "\n  LogicalWindow(window#0=[window(partition {2} aggs [SUM($0), COUNT($0), COUNT($1)])])",
+          "\n    LogicalExchange(distribution=[hash[2]])",
+          "\n      LogicalProject(col3=[$1], col1=[$2], $2=[REVERSE(CONCAT($2, '-', $0))])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(ORDER BY) only",
+        "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER(ORDER BY a.col2) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2])",
+          "\n  LogicalWindow(window#0=[window(order by [0] aggs [SUM($1)])])",
+          "\n    LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(ORDER BY) only with select alias",
+        "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER(ORDER BY a.col2) AS sum FROM a",
+        "notes": "TODO: Look into why aliases are getting ignored in the final plan",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2])",
+          "\n  LogicalWindow(window#0=[window(order by [0] aggs [SUM($1)])])",
+          "\n    LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(ORDER BY) and select col",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(a.col3) OVER(ORDER BY a.col2) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$2], EXPR$1=[CAST(/($3, $4)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(order by [0] aggs [SUM($1), COUNT($1)])])",
+          "\n    LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(ORDER BY) and select col with select alias",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1 AS value1, AVG(a.col3) OVER(ORDER BY a.col2) AS avg FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(value1=[$2], avg=[CAST(/($3, $4)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(order by [0] aggs [SUM($1), COUNT($1)])])",
+          "\n    LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(ORDER BY) with default frame",
+        "sql": "EXPLAIN PLAN FOR SELECT MAX(a.col3) OVER(ORDER BY a.col1 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2])",
+          "\n  LogicalWindow(window#0=[window(order by [1] aggs [MAX($0)])])",
+          "\n    LogicalSortExchange(distribution=[hash], collation=[[1]])",
+          "\n      LogicalProject(col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(ORDER BY) and select col with global order by",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col2, MIN(a.col3) OVER(ORDER BY a.col1 DESC) FROM a ORDER BY a.col1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
+          "\n  LogicalSortExchange(distribution=[hash], collation=[[2]])",
+          "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
+          "\n      LogicalProject(col2=[$0], EXPR$1=[$3], col1=[$2])",
+          "\n        LogicalWindow(window#0=[window(order by [2 DESC] aggs [MIN($1)])])",
+          "\n          LogicalSortExchange(distribution=[hash], collation=[[2 DESC]])",
+          "\n            LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(ORDER BY) and transform col",
+        "sql": "EXPLAIN PLAN FOR SELECT SUBSTR(a.col1, 0, 2), COUNT(a.col2) OVER(ORDER BY a.col3) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2], $1=[$3])",
+          "\n  LogicalWindow(window#0=[window(order by [1] aggs [COUNT($0)])])",
+          "\n    LogicalSortExchange(distribution=[hash], collation=[[1]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], $2=[SUBSTR($2, 0, 2)])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(ORDER BY) select col and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col2, AVG(a.col3) OVER(ORDER BY a.col2) FROM a WHERE a.col3 > 10 AND a.col3 <= 500",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col2=[$0], EXPR$1=[CAST(/($2, $3)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(order by [0] aggs [SUM($1), COUNT($1)])])",
+          "\n    LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1])",
+          "\n        LogicalFilter(condition=[AND(>($1, 10), <=($1, 500))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(ORDER BY) with select transform and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT CONCAT(a.col1, '-', a.col2), AVG(a.col3) OVER(ORDER BY a.col2) FROM a where a.col1 NOT IN ('foo', 'bar') OR a.col3 >= 42",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(EXPR$0=[$2], EXPR$1=[CAST(/($3, $4)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(order by [0] aggs [SUM($1), COUNT($1)])])",
+          "\n    LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], $2=[CONCAT($2, '-', $0)])",
+          "\n        LogicalFilter(condition=[OR(AND(<>($2, 'bar'), <>($2, 'foo')), >=($1, 42))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(ORDER BY) with transform on partition key",
+        "sql": "EXPLAIN PLAN FOR SELECT AVG(a.col3) OVER(ORDER BY CONCAT(a.col1, '-', a.col2)) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(EXPR$0=[CAST(/($2, $3)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(order by [1] aggs [SUM($0), COUNT($0)])])",
+          "\n    LogicalSortExchange(distribution=[hash], collation=[[1]])",
+          "\n      LogicalProject(col3=[$1], $1=[CONCAT($2, '-', $0)])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(ORDER BY)s on the same key only",
+        "sql": "EXPLAIN PLAN FOR SELECT MAX(a.col3) OVER(ORDER BY a.col1), COUNT(a.col2) OVER(ORDER BY a.col1) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$3], $1=[$4])",
+          "\n  LogicalWindow(window#0=[window(order by [2] aggs [MAX($1), COUNT($0)])])",
+          "\n    LogicalSortExchange(distribution=[hash], collation=[[2]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(ORDER BY)s on the same key only with select alias",
+        "sql": "EXPLAIN PLAN FOR SELECT MAX(a.col3) OVER(ORDER BY a.col1) AS max, COUNT(a.col2) OVER(ORDER BY a.col1) AS count FROM a",
+        "notes": "TODO: Look into why aliases are getting ignored in the final plan",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$3], $1=[$4])",
+          "\n  LogicalWindow(window#0=[window(order by [2] aggs [MAX($1), COUNT($0)])])",
+          "\n    LogicalSortExchange(distribution=[hash], collation=[[2]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(ORDER BY)s on the same key and select col",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(a.col3) OVER(ORDER BY a.col1), MIN(a.col3) OVER(ORDER BY a.col1) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$1], EXPR$1=[CAST(/($2, $3)):INTEGER NOT NULL], EXPR$2=[$4])",
+          "\n  LogicalWindow(window#0=[window(order by [1] aggs [SUM($0), COUNT($0), MIN($0)])])",
+          "\n    LogicalSortExchange(distribution=[hash], collation=[[1]])",
+          "\n      LogicalProject(col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(ORDER BY)s on the same key and select col with select alias",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1 AS value1, AVG(a.col3) OVER(ORDER BY a.col1) AS avg, MIN(a.col3) OVER(ORDER BY a.col1) AS min FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(value1=[$1], avg=[CAST(/($2, $3)):INTEGER NOT NULL], min=[$4])",
+          "\n  LogicalWindow(window#0=[window(order by [1] aggs [SUM($0), COUNT($0), MIN($0)])])",
+          "\n    LogicalSortExchange(distribution=[hash], collation=[[1]])",
+          "\n      LogicalProject(col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(ORDER BY)s on the same key with default frame for one and not the other",
+        "sql": "EXPLAIN PLAN FOR SELECT COUNT(a.col3) OVER(ORDER BY a.col2 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), MIN(a.col3) OVER(ORDER BY a.col2) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2], $1=[$3])",
+          "\n  LogicalWindow(window#0=[window(order by [0] aggs [COUNT($1), MIN($1)])])",
+          "\n    LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(ORDER BY)s on the same key and select col with global order by",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER(ORDER BY a.col2, a.col1 DESC), AVG(a.col3) OVER(ORDER BY a.col2, a.col1 DESC) FROM a ORDER BY a.col1 DESC",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$0], dir0=[DESC], offset=[0])",
+          "\n  LogicalSortExchange(distribution=[hash], collation=[[0 DESC]])",
+          "\n    LogicalSort(sort0=[$0], dir0=[DESC])",
+          "\n      LogicalProject(col1=[$2], EXPR$1=[$3], EXPR$2=[CAST(/($3, $4)):INTEGER NOT NULL])",
+          "\n        LogicalWindow(window#0=[window(order by [0, 2 DESC] aggs [SUM($1), COUNT($1)])])",
+          "\n          LogicalSortExchange(distribution=[hash], collation=[[0, 2 DESC]])",
+          "\n            LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(ORDER BY)s on the same key and transform col",
+        "sql": "EXPLAIN PLAN FOR SELECT REVERSE(a.col1), SUM(a.col3) OVER(ORDER BY a.col2), MAX(a.col3) OVER(ORDER BY a.col2) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(EXPR$0=[REVERSE($2)], EXPR$1=[$3], EXPR$2=[$4])",
+          "\n  LogicalWindow(window#0=[window(order by [0] aggs [SUM($1), MAX($1)])])",
+          "\n    LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(ORDER BY)s on the same key select col and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(a.col3) OVER(ORDER BY a.col1), COUNT(a.col1) OVER(ORDER BY a.col1) FROM a WHERE a.col3 > 42 AND a.col1 IN ('vader', 'chewbacca', 'yoda')",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$1], EXPR$1=[CAST(/($2, $3)):INTEGER NOT NULL], EXPR$2=[$4])",
+          "\n  LogicalWindow(window#0=[window(order by [1] aggs [SUM($0), COUNT($0), COUNT($1)])])",
+          "\n    LogicalSortExchange(distribution=[hash], collation=[[1]])",
+          "\n      LogicalProject(col3=[$1], col1=[$2])",
+          "\n        LogicalFilter(condition=[AND(>($1, 42), OR(=($2, 'chewbacca':VARCHAR(9)), =($2, 'vader':VARCHAR(9)), =($2, 'yoda':VARCHAR(9))))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(ORDER BY)s on the same key with select transform and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT REVERSE(CONCAT(a.col1, ' ', a.col2)), MIN(a.col3) OVER(ORDER BY a.col1), MAX(a.col3) OVER(ORDER BY a.col1) FROM a where a.col2 NOT IN ('foo', 'bar', 'baz')",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2], $1=[$3], $2=[$4])",
+          "\n  LogicalWindow(window#0=[window(order by [1] aggs [MIN($0), MAX($0)])])",
+          "\n    LogicalSortExchange(distribution=[hash], collation=[[1]])",
+          "\n      LogicalProject(col3=[$1], col1=[$2], $2=[REVERSE(CONCAT($2, ' ', $0))])",
+          "\n        LogicalFilter(condition=[AND(<>($0, 'bar'), <>($0, 'baz'), <>($0, 'foo'))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(ORDER BY) with transform on partition key",
+        "sql": "EXPLAIN PLAN FOR SELECT AVG(a.col3) OVER(ORDER BY REVERSE(CONCAT(a.col1, '-', a.col2))), COUNT(a.col1) OVER(ORDER BY REVERSE(CONCAT(a.col1, '-', a.col2))) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(EXPR$0=[CAST(/($3, $4)):INTEGER NOT NULL], EXPR$1=[$5])",
+          "\n  LogicalWindow(window#0=[window(order by [2] aggs [SUM($0), COUNT($0), COUNT($1)])])",
+          "\n    LogicalSortExchange(distribution=[hash], collation=[[2]])",
+          "\n      LogicalProject(col3=[$1], col1=[$2], $2=[REVERSE(CONCAT($2, '-', $0))])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY k1 ORDER BY k1) only",
+        "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER(PARTITION BY a.col2 ORDER BY a.col2) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2])",
+          "\n  LogicalWindow(window#0=[window(partition {0} order by [0] aggs [SUM($1)])])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY k1 ORDER BY k1) only with alias",
+        "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER(PARTITION BY a.col2 ORDER BY a.col2) AS sum FROM a",
+        "notes": "TODO: Look into why aliases are getting ignored in the final plan",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2])",
+          "\n  LogicalWindow(window#0=[window(partition {0} order by [0] aggs [SUM($1)])])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY k1 ORDER BY k1) and select col",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(a.col3) OVER(PARTITION BY a.col2 ORDER BY a.col2) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$2], EXPR$1=[CAST(/($3, $4)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(partition {0} order by [0] aggs [SUM($1), COUNT($1)])])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY k1 ORDER BY k1) and select col with select alias",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1 AS value1, AVG(a.col3) OVER(PARTITION BY a.col2 ORDER BY a.col2) AS avg FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(value1=[$2], avg=[CAST(/($3, $4)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(partition {0} order by [0] aggs [SUM($1), COUNT($1)])])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY k1 ORDER BY k1) with default frame",
+        "sql": "EXPLAIN PLAN FOR SELECT MAX(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col1 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2])",
+          "\n  LogicalWindow(window#0=[window(partition {1} order by [1] aggs [MAX($0)])])",
+          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n      LogicalProject(col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY k1 ORDER BY k1) and select col with global order by",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col2, MIN(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col1) FROM a ORDER BY a.col1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
+          "\n  LogicalSortExchange(distribution=[hash], collation=[[2]])",
+          "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
+          "\n      LogicalProject(col2=[$0], EXPR$1=[$3], col1=[$2])",
+          "\n        LogicalWindow(window#0=[window(partition {2} order by [2] aggs [MIN($1)])])",
+          "\n          LogicalExchange(distribution=[hash[2]])",
+          "\n            LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY k1 ORDER BY k1) and transform col",
+        "sql": "EXPLAIN PLAN FOR SELECT SUBSTR(a.col1, 0, 2), COUNT(a.col2) OVER(PARTITION BY a.col3 ORDER BY a.col3) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2], $1=[$3])",
+          "\n  LogicalWindow(window#0=[window(partition {1} order by [1] aggs [COUNT($0)])])",
+          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], $2=[SUBSTR($2, 0, 2)])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY k1 ORDER BY k1) select col and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col2, AVG(a.col3) OVER(PARTITION BY a.col2 ORDER BY a.col2) FROM a WHERE a.col3 > 10 AND a.col3 <= 500",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col2=[$0], EXPR$1=[CAST(/($2, $3)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(partition {0} order by [0] aggs [SUM($1), COUNT($1)])])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1])",
+          "\n        LogicalFilter(condition=[AND(>($1, 10), <=($1, 500))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY k1 ORDER BY k1) with select transform and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT CONCAT(a.col1, '-', a.col2), AVG(a.col3) OVER(PARTITION BY a.col2 ORDER BY a.col2) FROM a where a.col1 NOT IN ('foo', 'bar') OR a.col3 >= 42",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(EXPR$0=[$2], EXPR$1=[CAST(/($3, $4)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(partition {0} order by [0] aggs [SUM($1), COUNT($1)])])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], $2=[CONCAT($2, '-', $0)])",
+          "\n        LogicalFilter(condition=[OR(AND(<>($2, 'bar'), <>($2, 'foo')), >=($1, 42))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY k1 ORDER BY k1) with transform on partition key",
+        "sql": "EXPLAIN PLAN FOR SELECT AVG(a.col3) OVER(PARTITION BY CONCAT(a.col1, '-', a.col2) ORDER BY CONCAT(a.col1, '-', a.col2)) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(EXPR$0=[CAST(/($2, $3)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(partition {1} order by [1] aggs [SUM($0), COUNT($0)])])",
+          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n      LogicalProject(col3=[$1], $1=[CONCAT($2, '-', $0)])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY k1 ORDER BY k1)s on the same key only",
+        "sql": "EXPLAIN PLAN FOR SELECT MAX(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col1), COUNT(a.col2) OVER(PARTITION BY a.col1 ORDER BY a.col1) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$3], $1=[$4])",
+          "\n  LogicalWindow(window#0=[window(partition {2} order by [2] aggs [MAX($1), COUNT($0)])])",
+          "\n    LogicalExchange(distribution=[hash[2]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY k1 ORDER BY k1)s on the same key only with select alias",
+        "sql": "EXPLAIN PLAN FOR SELECT MAX(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col1) AS max, COUNT(a.col2) OVER(PARTITION BY a.col1 ORDER BY a.col1) AS count FROM a",
+        "notes": "TODO: Look into why aliases are getting ignored in the final plan",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$3], $1=[$4])",
+          "\n  LogicalWindow(window#0=[window(partition {2} order by [2] aggs [MAX($1), COUNT($0)])])",
+          "\n    LogicalExchange(distribution=[hash[2]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY k1 ORDER BY k1)s on the same key and select col",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col1), MIN(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col1) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$1], EXPR$1=[CAST(/($2, $3)):INTEGER NOT NULL], EXPR$2=[$4])",
+          "\n  LogicalWindow(window#0=[window(partition {1} order by [1] aggs [SUM($0), COUNT($0), MIN($0)])])",
+          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n      LogicalProject(col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY k1 ORDER BY k1)s on the same key and select col with select alias",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1 AS value1, AVG(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col1) AS avg, MIN(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col1) AS min FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(value1=[$1], avg=[CAST(/($2, $3)):INTEGER NOT NULL], min=[$4])",
+          "\n  LogicalWindow(window#0=[window(partition {1} order by [1] aggs [SUM($0), COUNT($0), MIN($0)])])",
+          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n      LogicalProject(col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY k1 ORDER BY k1)s on the same key with default frame for one and not the other",
+        "sql": "EXPLAIN PLAN FOR SELECT COUNT(a.col3) OVER(PARTITION BY a.col2 ORDER BY a.col2 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), MIN(a.col3) OVER(PARTITION BY a.col2 ORDER BY a.col2) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2], $1=[$3])",
+          "\n  LogicalWindow(window#0=[window(partition {0} order by [0] aggs [COUNT($1), MIN($1)])])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY k1 ORDER BY k1)s on the same key and select col with global order by",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER(PARTITION BY a.col2, a.col1 ORDER BY a.col2, a.col1), AVG(a.col3) OVER(PARTITION BY a.col2, a.col1 ORDER BY a.col2, a.col1) FROM a ORDER BY a.col2, a.col1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[ASC], offset=[0])",
+          "\n  LogicalSortExchange(distribution=[hash], collation=[[3, 0]])",
+          "\n    LogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[ASC])",
+          "\n      LogicalProject(col1=[$2], EXPR$1=[$3], EXPR$2=[CAST(/($3, $4)):INTEGER NOT NULL], col2=[$0])",
+          "\n        LogicalWindow(window#0=[window(partition {0, 2} order by [0, 2] aggs [SUM($1), COUNT($1)])])",
+          "\n          LogicalExchange(distribution=[hash[0, 2]])",
+          "\n            LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY k1 ORDER BY k1)s on the same key and transform col",
+        "sql": "EXPLAIN PLAN FOR SELECT REVERSE(a.col1), SUM(a.col3) OVER(PARTITION BY a.col2 ORDER BY a.col2), MAX(a.col3) OVER(PARTITION BY a.col2 ORDER BY a.col2) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(EXPR$0=[REVERSE($2)], EXPR$1=[$3], EXPR$2=[$4])",
+          "\n  LogicalWindow(window#0=[window(partition {0} order by [0] aggs [SUM($1), MAX($1)])])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY k1 ORDER BY k1)s on the same key select col and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col1), COUNT(a.col1) OVER(PARTITION BY a.col1 ORDER BY a.col1) FROM a WHERE a.col3 > 42 AND a.col1 IN ('vader', 'chewbacca', 'yoda')",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$1], EXPR$1=[CAST(/($2, $3)):INTEGER NOT NULL], EXPR$2=[$4])",
+          "\n  LogicalWindow(window#0=[window(partition {1} order by [1] aggs [SUM($0), COUNT($0), COUNT($1)])])",
+          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n      LogicalProject(col3=[$1], col1=[$2])",
+          "\n        LogicalFilter(condition=[AND(>($1, 42), OR(=($2, 'chewbacca':VARCHAR(9)), =($2, 'vader':VARCHAR(9)), =($2, 'yoda':VARCHAR(9))))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY k1 ORDER BY k1)s on the same key with select transform and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT REVERSE(CONCAT(a.col1, ' ', a.col2)), MIN(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col1), MAX(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col1) FROM a where a.col2 NOT IN ('foo', 'bar', 'baz')",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2], $1=[$3], $2=[$4])",
+          "\n  LogicalWindow(window#0=[window(partition {1} order by [1] aggs [MIN($0), MAX($0)])])",
+          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n      LogicalProject(col3=[$1], col1=[$2], $2=[REVERSE(CONCAT($2, ' ', $0))])",
+          "\n        LogicalFilter(condition=[AND(<>($0, 'bar'), <>($0, 'baz'), <>($0, 'foo'))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY k1 ORDER BY k1) with transform on partition key",
+        "sql": "EXPLAIN PLAN FOR SELECT AVG(a.col3) OVER(PARTITION BY REVERSE(CONCAT(a.col1, '-', a.col2)) ORDER BY REVERSE(CONCAT(a.col1, '-', a.col2))), COUNT(a.col1) OVER(PARTITION BY REVERSE(CONCAT(a.col1, '-', a.col2)) ORDER BY REVERSE(CONCAT(a.col1, '-', a.col2))) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(EXPR$0=[CAST(/($3, $4)):INTEGER NOT NULL], EXPR$1=[$5])",
+          "\n  LogicalWindow(window#0=[window(partition {2} order by [2] aggs [SUM($0), COUNT($0), COUNT($1)])])",
+          "\n    LogicalExchange(distribution=[hash[2]])",
+          "\n      LogicalProject(col3=[$1], col1=[$2], $2=[REVERSE(CONCAT($2, '-', $0))])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY k1 ORDER BY k2) only",
+        "sql": "EXPLAIN PLAN FOR SELECT AVG(a.col3) OVER(PARTITION BY a.col2 ORDER BY a.col1) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(EXPR$0=[CAST(/($3, $4)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(partition {0} order by [2] aggs [SUM($1), COUNT($1)])])",
+          "\n    LogicalSortExchange(distribution=[hash[0]], collation=[[2]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY k1 ORDER BY k2) only with select alias",
+        "sql": "EXPLAIN PLAN FOR SELECT AVG(a.col3) OVER(PARTITION BY a.col2 ORDER BY a.col1) AS avg FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(avg=[CAST(/($3, $4)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(partition {0} order by [2] aggs [SUM($1), COUNT($1)])])",
+          "\n    LogicalSortExchange(distribution=[hash[0]], collation=[[2]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY k1 ORDER BY k2) and select col",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(a.col3) OVER(PARTITION BY a.col2 ORDER BY a.col1) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$2], EXPR$1=[CAST(/($3, $4)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(partition {0} order by [2] aggs [SUM($1), COUNT($1)])])",
+          "\n    LogicalSortExchange(distribution=[hash[0]], collation=[[2]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY k1 ORDER BY k2) and select col with select alias",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1 AS value1, AVG(a.col3) OVER(PARTITION BY a.col2 ORDER BY a.col1) AS avg FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(value1=[$2], avg=[CAST(/($3, $4)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(partition {0} order by [2] aggs [SUM($1), COUNT($1)])])",
+          "\n    LogicalSortExchange(distribution=[hash[0]], collation=[[2]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY k1 ORDER BY k2) with default frame",
+        "sql": "EXPLAIN PLAN FOR SELECT MAX(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col2 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$3])",
+          "\n  LogicalWindow(window#0=[window(partition {2} order by [0] aggs [MAX($1)])])",
+          "\n    LogicalSortExchange(distribution=[hash[2]], collation=[[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY k1 ORDER BY k2) and select col with global order by",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col2, MIN(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col2) FROM a ORDER BY a.col1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
+          "\n  LogicalSortExchange(distribution=[hash], collation=[[2]])",
+          "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
+          "\n      LogicalProject(col2=[$0], EXPR$1=[$3], col1=[$2])",
+          "\n        LogicalWindow(window#0=[window(partition {2} order by [0] aggs [MIN($1)])])",
+          "\n          LogicalSortExchange(distribution=[hash[2]], collation=[[0]])",
+          "\n            LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY k1 ORDER BY k2) and transform col",
+        "sql": "EXPLAIN PLAN FOR SELECT SUBSTR(a.col1, 0, 2), COUNT(a.col2) OVER(PARTITION BY a.col3 ORDER BY a.col1) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$3], $1=[$4])",
+          "\n  LogicalWindow(window#0=[window(partition {1} order by [2] aggs [COUNT($0)])])",
+          "\n    LogicalSortExchange(distribution=[hash[1]], collation=[[2]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2], $3=[SUBSTR($2, 0, 2)])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY k1 ORDER BY k2) select col and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col2, AVG(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col2) FROM a WHERE a.col3 > 10 AND a.col3 <= 500",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col2=[$0], EXPR$1=[CAST(/($3, $4)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(partition {2} order by [0] aggs [SUM($1), COUNT($1)])])",
+          "\n    LogicalSortExchange(distribution=[hash[2]], collation=[[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalFilter(condition=[AND(>($1, 10), <=($1, 500))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY k1 ORDER BY k2) with select transform and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT CONCAT(a.col1, '-', a.col2), AVG(a.col3) OVER(PARTITION BY a.col2 ORDER BY a.col1) FROM a where a.col1 NOT IN ('foo', 'bar') OR a.col3 >= 42",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(EXPR$0=[$3], EXPR$1=[CAST(/($4, $5)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(partition {0} order by [2] aggs [SUM($1), COUNT($1)])])",
+          "\n    LogicalSortExchange(distribution=[hash[0]], collation=[[2]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2], $3=[CONCAT($2, '-', $0)])",
+          "\n        LogicalFilter(condition=[OR(AND(<>($2, 'bar'), <>($2, 'foo')), >=($1, 42))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY k1 ORDER BY k2) with transform on partition key",
+        "sql": "EXPLAIN PLAN FOR SELECT AVG(a.col3) OVER(PARTITION BY CONCAT(a.col1, '-', a.col2) ORDER BY REVERSE(a.col2)) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(EXPR$0=[CAST(/($3, $4)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(partition {2} order by [1] aggs [SUM($0), COUNT($0)])])",
+          "\n    LogicalSortExchange(distribution=[hash[2]], collation=[[1]])",
+          "\n      LogicalProject(col3=[$1], $1=[REVERSE($0)], $2=[CONCAT($2, '-', $0)])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY k1 ORDER BY k2)s on the same key only (single window group)",
+        "sql": "EXPLAIN PLAN FOR SELECT MAX(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col3), COUNT(a.col2) OVER(PARTITION BY a.col1 ORDER BY a.col3) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$3], $1=[$4])",
+          "\n  LogicalWindow(window#0=[window(partition {2} order by [1] aggs [MAX($1), COUNT($0)])])",
+          "\n    LogicalSortExchange(distribution=[hash[2]], collation=[[1]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY k1 ORDER BY k2)s on the same key only (single window group) with select alias",
+        "sql": "EXPLAIN PLAN FOR SELECT MAX(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col3) AS max, COUNT(a.col2) OVER(PARTITION BY a.col1 ORDER BY a.col3) AS count FROM a",
+        "notes": "TODO: Look into why aliases are getting ignored in the final plan",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$3], $1=[$4])",
+          "\n  LogicalWindow(window#0=[window(partition {2} order by [1] aggs [MAX($1), COUNT($0)])])",
+          "\n    LogicalSortExchange(distribution=[hash[2]], collation=[[1]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY k1 ORDER BY k2)s on the same key and select col (single window group)",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col2), MIN(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col2) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$2], EXPR$1=[CAST(/($3, $4)):INTEGER NOT NULL], EXPR$2=[$5])",
+          "\n  LogicalWindow(window#0=[window(partition {2} order by [0] aggs [SUM($1), COUNT($1), MIN($1)])])",
+          "\n    LogicalSortExchange(distribution=[hash[2]], collation=[[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY k1 ORDER BY k2)s on the same key and select col (single window group) with select alias",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1 AS value1, AVG(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col2) AS avg, MIN(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col2) AS min FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(value1=[$2], avg=[CAST(/($3, $4)):INTEGER NOT NULL], min=[$5])",
+          "\n  LogicalWindow(window#0=[window(partition {2} order by [0] aggs [SUM($1), COUNT($1), MIN($1)])])",
+          "\n    LogicalSortExchange(distribution=[hash[2]], collation=[[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY k1 ORDER BY k2)s on the same key with default frame for one and not the other (single window group)",
+        "sql": "EXPLAIN PLAN FOR SELECT COUNT(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col2 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), MIN(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col2) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$3], $1=[$4])",
+          "\n  LogicalWindow(window#0=[window(partition {2} order by [0] aggs [COUNT($1), MIN($1)])])",
+          "\n    LogicalSortExchange(distribution=[hash[2]], collation=[[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY k1 ORDER BY k2)s on the same key and select col with global order by (single window group)",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER(PARTITION BY a.col2, a.col1 ORDER BY a.col3, a.col1), AVG(a.col3) OVER(PARTITION BY a.col2, a.col1 ORDER BY a.col3, a.col1) FROM a ORDER BY a.col2, a.col1 DESC",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[DESC], offset=[0])",
+          "\n  LogicalSortExchange(distribution=[hash], collation=[[3, 0 DESC]])",
+          "\n    LogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[DESC])",
+          "\n      LogicalProject(col1=[$2], EXPR$1=[$3], EXPR$2=[CAST(/($3, $4)):INTEGER NOT NULL], col2=[$0])",
+          "\n        LogicalWindow(window#0=[window(partition {0, 2} order by [1, 2] aggs [SUM($1), COUNT($1)])])",
+          "\n          LogicalSortExchange(distribution=[hash[0, 2]], collation=[[1, 2]])",
+          "\n            LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY k1 ORDER BY k2)s on the same key and transform col (single window group)",
+        "sql": "EXPLAIN PLAN FOR SELECT REVERSE(a.col1), SUM(a.col3) OVER(PARTITION BY a.col2 ORDER BY a.col1), MAX(a.col3) OVER(PARTITION BY a.col2 ORDER BY a.col1) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$3], $1=[$4], $2=[$5])",
+          "\n  LogicalWindow(window#0=[window(partition {0} order by [2] aggs [SUM($1), MAX($1)])])",
+          "\n    LogicalSortExchange(distribution=[hash[0]], collation=[[2]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2], $3=[REVERSE($2)])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY k1 ORDER BY k2)s on the same key select col and filter (single window group)",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col2), COUNT(a.col1) OVER(PARTITION BY a.col1 ORDER BY a.col2) FROM a WHERE a.col3 > 42 AND a.col1 IN ('vader', 'chewbacca', 'yoda')",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$2], EXPR$1=[CAST(/($3, $4)):INTEGER NOT NULL], EXPR$2=[$5])",
+          "\n  LogicalWindow(window#0=[window(partition {2} order by [0] aggs [SUM($1), COUNT($1), COUNT($2)])])",
+          "\n    LogicalSortExchange(distribution=[hash[2]], collation=[[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2])",
+          "\n        LogicalFilter(condition=[AND(>($1, 42), OR(=($2, 'chewbacca':VARCHAR(9)), =($2, 'vader':VARCHAR(9)), =($2, 'yoda':VARCHAR(9))))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY k1 ORDER BY k2)s on the same key with select transform and filter (single window group)",
+        "sql": "EXPLAIN PLAN FOR SELECT REVERSE(CONCAT(a.col1, ' ', a.col2)), MIN(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col2), MAX(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col2) FROM a where a.col2 NOT IN ('foo', 'bar', 'baz')",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$3], $1=[$4], $2=[$5])",
+          "\n  LogicalWindow(window#0=[window(partition {2} order by [0] aggs [MIN($1), MAX($1)])])",
+          "\n    LogicalSortExchange(distribution=[hash[2]], collation=[[0]])",
+          "\n      LogicalProject(col2=[$0], col3=[$1], col1=[$2], $3=[REVERSE(CONCAT($2, ' ', $0))])",
+          "\n        LogicalFilter(condition=[AND(<>($0, 'bar'), <>($0, 'baz'), <>($0, 'foo'))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "multiple OVER(PARTITION BY k1 ORDER BY k2) with transform on partition key (single window group)",
+        "sql": "EXPLAIN PLAN FOR SELECT AVG(a.col3) OVER(PARTITION BY REVERSE(CONCAT(a.col1, '-', a.col2)) ORDER BY CONCAT(a.col1, '-', a.col2)), COUNT(a.col1) OVER(PARTITION BY REVERSE(CONCAT(a.col1, '-', a.col2)) ORDER BY CONCAT(a.col1, '-', a.col2)) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(EXPR$0=[CAST(/($4, $5)):INTEGER NOT NULL], EXPR$1=[$6])",
+          "\n  LogicalWindow(window#0=[window(partition {3} order by [2] aggs [SUM($0), COUNT($0), COUNT($1)])])",
+          "\n    LogicalSortExchange(distribution=[hash[3]], collation=[[2]])",
+          "\n      LogicalProject(col3=[$1], col1=[$2], $2=[CONCAT($2, '-', $0)], $3=[REVERSE(CONCAT($2, '-', $0))])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Window function with JOIN example",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, b.col1, SUM(a.col3) OVER (PARTITION BY a.col1) FROM a JOIN b ON a.col1 = b.col2",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$1], col10=[$2], $2=[$3])",
+          "\n  LogicalWindow(window#0=[window(partition {1} aggs [SUM($0)])])",
+          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n      LogicalProject(col3=[$0], col1=[$1], col10=[$3])",
+          "\n        LogicalJoin(condition=[=($1, $2)], joinType=[inner])",
+          "\n          LogicalExchange(distribution=[hash[1]])",
+          "\n            LogicalProject(col3=[$1], col1=[$2])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n            LogicalProject(col2=[$0], col1=[$2])",
+          "\n              LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Window function with GROUP BY example with aggregation used within ORDER BY clause in OVER",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, COUNT(*), AVG(a.col3) OVER(ORDER BY COUNT(*) desc, a.col1 asc) from a GROUP BY a.col1, a.col3",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$1], EXPR$1=[$2], EXPR$2=[CAST(/($3, $4)):INTEGER NOT NULL])",
+          "\n  LogicalWindow(window#0=[window(order by [2 DESC, 1] aggs [SUM($0), COUNT($0)])])",
+          "\n    LogicalSortExchange(distribution=[hash], collation=[[2 DESC, 1]])",
+          "\n      LogicalAggregate(group=[{0, 1}], EXPR$1=[$SUM0($2)])",
+          "\n        LogicalExchange(distribution=[hash[0, 1]])",
+          "\n          LogicalAggregate(group=[{1, 2}], EXPR$1=[COUNT()])",
+          "\n            LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Window function with GROUP BY example with aggregation used within ORDER BY clause in OVER with PARTITION BY",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, COUNT(*), MAX(a.col3) OVER(PARTITION BY a.col1 ORDER BY COUNT(*) desc, a.col1 asc) from a GROUP BY a.col1, a.col3",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$1], EXPR$1=[$2], $2=[$3])",
+          "\n  LogicalWindow(window#0=[window(partition {1} order by [2 DESC, 1] aggs [MAX($0)])])",
+          "\n    LogicalSortExchange(distribution=[hash[1]], collation=[[2 DESC, 1]])",
+          "\n      LogicalAggregate(group=[{0, 1}], EXPR$1=[$SUM0($2)])",
+          "\n        LogicalExchange(distribution=[hash[0, 1]])",
+          "\n          LogicalAggregate(group=[{1, 2}], EXPR$1=[COUNT()])",
+          "\n            LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      }
+    ]
+  },
+  "exception_throwing_window_function_planning_tests": {
+    "queries": [
+      {
+        "description": "unsupported window functions such as row_number()",
+        "notes": "not yet supported",
+        "sql": "EXPLAIN PLAN FOR SELECT ROW_NUMBER() OVER(PARTITION BY a.col1 ORDER BY a.col3) FROM a",
+        "expectedException": "Error explain query plan for.*"
+      },
+      {
+        "description": "unsupported custom frames",
+        "notes": "not yet supported",
+        "sql": "EXPLAIN PLAN FOR SELECT AVG(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col3 ROWS BETWEEN 5 PRECEDING AND 10 FOLLOWING) FROM a WHERE a.col3 >= 0",
+        "expectedException": "Error explain query plan for.*"
+      },
+      {
+        "description": "unsupported custom frames",
+        "notes": "not yet supported",
+        "sql": "EXPLAIN PLAN FOR SELECT MIN(a.col3) OVER(ORDER BY a.col3 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM a",
+        "expectedException": "Error explain query plan for.*"
+      },
+      {
+        "description": "Multiple window groups",
+        "notes": "not yet supported",
+        "sql": "EXPLAIN PLAN FOR SELECT MIN(a.col3) OVER(PARTITION BY a.col2 ORDER BY a.col3), MAX(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col3) FROM a",
+        "expectedException": "Error explain query plan for.*"
+      },
+      {
+        "description": "Using aggregation inside ORDER BY within OVER",
+        "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER(ORDER BY MAX(a.col3)) FROM a",
+        "expectedException": "Error explain query plan for.*"
+      },
+      {
+        "description": "Using aggregation inside PARTITION BY within OVER",
+        "sql": "EXPLAIN PLAN FOR SELECT COUNT(a.col1) OVER(PARTITION BY AVG(a.col3)) FROM a",
+        "expectedException": "Error explain query plan for.*"
+      },
+      {
+        "description": "Select an aggregate along with OVER()",
+        "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3), COUNT(a.col1) OVER() FROM a",
+        "expectedException": "Error explain query plan for.*"
+      },
+      {
+        "description": "External group by clause",
+        "sql": "EXPLAIN PLAN FOR SELECT MIN(a.col3) OVER(PARTITION BY a.col2) FROM a GROUP BY a.col2",
+        "expectedException": "Error explain query plan for.*"
+      },
+      {
+        "description": "Global order by aggregate",
+        "sql": "EXPLAIN PLAN FOR SELECT MAX(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col2) FROM a ORDER BY SUM(a.col3)",
+        "expectedException": "Error explain query plan for.*"
+      },
+      {
+        "description": "Wrong table",
+        "sql": "EXPLAIN PLAN FOR SELECT MAX(b.col3) OVER(PARTITION BY b.col1 ORDER BY b.col2) FROM a ORDER BY SUM(b.col3)",
+        "expectedException": "Error explain query plan for.*"
+      }
+    ]
+  }
+}
\ No newline at end of file
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index e4d54a57a8..3fb95547f7 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -31,6 +31,7 @@ import org.apache.pinot.query.planner.stage.StageNode;
 import org.apache.pinot.query.planner.stage.StageNodeVisitor;
 import org.apache.pinot.query.planner.stage.TableScanNode;
 import org.apache.pinot.query.planner.stage.ValueNode;
+import org.apache.pinot.query.planner.stage.WindowNode;
 import org.apache.pinot.query.routing.VirtualServer;
 import org.apache.pinot.query.runtime.operator.AggregateOperator;
 import org.apache.pinot.query.runtime.operator.FilterOperator;
@@ -87,6 +88,12 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator,
         node.getGroupSet(), node.getInputs().get(0).getDataSchema(), context._requestId, context._stageId);
   }
 
+  @Override
+  public MultiStageOperator visitWindow(WindowNode node, PlanRequestContext context) {
+    MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
+    throw new UnsupportedOperationException("Window not yet supported!");
+  }
+
   @Override
   public MultiStageOperator visitFilter(FilterNode node, PlanRequestContext context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
index 85440e1aed..97924586b1 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
@@ -48,6 +48,7 @@ import org.apache.pinot.query.planner.stage.StageNode;
 import org.apache.pinot.query.planner.stage.StageNodeVisitor;
 import org.apache.pinot.query.planner.stage.TableScanNode;
 import org.apache.pinot.query.planner.stage.ValueNode;
+import org.apache.pinot.query.planner.stage.WindowNode;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
@@ -164,6 +165,11 @@ public class ServerRequestPlanVisitor implements StageNodeVisitor<Void, ServerPl
     return _aVoid;
   }
 
+  @Override
+  public Void visitWindow(WindowNode node, ServerPlanRequestContext context) {
+    throw new UnsupportedOperationException("Window not yet supported!");
+  }
+
   @Override
   public Void visitFilter(FilterNode node, ServerPlanRequestContext context) {
     visitChildren(node, context);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org