You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/03/22 14:23:54 UTC
flink git commit: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP,
STDEV_SAMP, STDEV_POP functions on GROUP BY windows.
Repository: flink
Updated Branches:
refs/heads/master 893fabf69 -> 8c042e378
[FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDEV_SAMP, STDEV_POP functions on GROUP BY windows.
This closes #5706.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c042e37
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c042e37
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c042e37
Branch: refs/heads/master
Commit: 8c042e378b65504c7d76302d508f1e33b2cfa524
Parents: 893fabf
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Mar 15 21:04:00 2018 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Mar 22 11:11:26 2018 +0100
----------------------------------------------------------------------
.../rel/rules/AggregateReduceFunctionsRule.java | 602 +++++++++++++++++++
.../nodes/logical/FlinkLogicalAggregate.scala | 9 +-
.../logical/FlinkLogicalWindowAggregate.scala | 17 +
.../flink/table/plan/rules/FlinkRuleSets.scala | 1 +
.../WindowAggregateReduceFunctionsRule.scala | 75 +++
.../table/runtime/aggregate/AggregateUtil.scala | 4 +-
.../table/api/batch/sql/GroupWindowTest.scala | 49 ++
.../table/api/batch/table/GroupWindowTest.scala | 45 ++
.../table/api/stream/sql/GroupWindowTest.scala | 46 ++
.../api/stream/table/GroupWindowTest.scala | 45 ++
10 files changed, 888 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8c042e37/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/AggregateReduceFunctionsRule.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/AggregateReduceFunctionsRule.java b/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/AggregateReduceFunctionsRule.java
new file mode 100644
index 0000000..ce466e1
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/AggregateReduceFunctionsRule.java
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.CompositeList;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Util;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/*
+ * THIS FILE HAS BEEN COPIED FROM THE APACHE CALCITE PROJECT TO MAKE IT MORE EXTENSIBLE.
+ *
+ * We have opened an issue to port this change to Apache Calcite (CALCITE-2216).
+ * Once CALCITE-2216 is fixed and included in a release, we can remove the copied class.
+ *
+ * Modification:
+ * - Added newCalcRel() method to be able to add fields to the projection.
+ */
+
+/**
+ * Planner rule that reduces aggregate functions in
+ * {@link org.apache.calcite.rel.core.Aggregate}s to simpler forms.
+ *
+ * <p>Rewrites:
+ * <ul>
+ *
+ * <li>AVG(x) → SUM(x) / COUNT(x)
+ *
+ * <li>STDDEV_POP(x) → SQRT(
+ * (SUM(x * x) - SUM(x) * SUM(x) / COUNT(x))
+ * / COUNT(x))
+ *
+ * <li>STDDEV_SAMP(x) → SQRT(
+ * (SUM(x * x) - SUM(x) * SUM(x) / COUNT(x))
+ * / CASE COUNT(x) WHEN 1 THEN NULL ELSE COUNT(x) - 1 END)
+ *
+ * <li>VAR_POP(x) → (SUM(x * x) - SUM(x) * SUM(x) / COUNT(x))
+ * / COUNT(x)
+ *
+ * <li>VAR_SAMP(x) → (SUM(x * x) - SUM(x) * SUM(x) / COUNT(x))
+ * / CASE COUNT(x) WHEN 1 THEN NULL ELSE COUNT(x) - 1 END
+ * </ul>
+ *
+ * <p>Since many of these rewrites introduce multiple occurrences of simpler
+ * forms like {@code COUNT(x)}, the rule gathers common sub-expressions as it
+ * goes.
+ */
+public class AggregateReduceFunctionsRule extends RelOptRule {
+ //~ Static fields/initializers ---------------------------------------------
+
+ /** The singleton. */
+ public static final AggregateReduceFunctionsRule INSTANCE =
+ new AggregateReduceFunctionsRule(operand(LogicalAggregate.class, any()),
+ RelFactories.LOGICAL_BUILDER);
+
+ //~ Constructors -----------------------------------------------------------
+
+ /** Creates an AggregateReduceFunctionsRule. */
+ public AggregateReduceFunctionsRule(RelOptRuleOperand operand,
+ RelBuilderFactory relBuilderFactory) {
+ super(operand, relBuilderFactory, null);
+ }
+
+ //~ Methods ----------------------------------------------------------------
+
+ @Override public boolean matches(RelOptRuleCall call) {
+ if (!super.matches(call)) {
+ return false;
+ }
+ Aggregate oldAggRel = (Aggregate) call.rels[0];
+ return containsAvgStddevVarCall(oldAggRel.getAggCallList());
+ }
+
+ public void onMatch(RelOptRuleCall ruleCall) {
+ Aggregate oldAggRel = (Aggregate) ruleCall.rels[0];
+ reduceAggs(ruleCall, oldAggRel);
+ }
+
+ /**
+ * Returns whether any of the aggregates are calls to AVG, STDDEV_*, VAR_*.
+ *
+ * @param aggCallList List of aggregate calls
+ */
+ private boolean containsAvgStddevVarCall(List<AggregateCall> aggCallList) {
+ for (AggregateCall call : aggCallList) {
+ if (isReducible(call.getAggregation().getKind())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Returns whether the aggregate call is a reducible function
+ */
+ private boolean isReducible(final SqlKind kind) {
+ if (SqlKind.AVG_AGG_FUNCTIONS.contains(kind)) {
+ return true;
+ }
+ switch (kind) {
+ case SUM:
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Reduces all calls to AVG, STDDEV_POP, STDDEV_SAMP, VAR_POP, VAR_SAMP in
+ * the aggregates list to.
+ *
+ * <p>It handles newly generated common subexpressions since this was done
+ * at the sql2rel stage.
+ */
+ private void reduceAggs(
+ RelOptRuleCall ruleCall,
+ Aggregate oldAggRel) {
+ RexBuilder rexBuilder = oldAggRel.getCluster().getRexBuilder();
+
+ List<AggregateCall> oldCalls = oldAggRel.getAggCallList();
+ final int groupCount = oldAggRel.getGroupCount();
+ final int indicatorCount = oldAggRel.getIndicatorCount();
+
+ final List<AggregateCall> newCalls = Lists.newArrayList();
+ final Map<AggregateCall, RexNode> aggCallMapping = Maps.newHashMap();
+
+ final List<RexNode> projList = Lists.newArrayList();
+
+ // pass through group key (+ indicators if present)
+ for (int i = 0; i < groupCount + indicatorCount; ++i) {
+ projList.add(
+ rexBuilder.makeInputRef(
+ getFieldType(oldAggRel, i),
+ i));
+ }
+
+ // List of input expressions. If a particular aggregate needs more, it
+ // will add an expression to the end, and we will create an extra
+ // project.
+ final RelBuilder relBuilder = ruleCall.builder();
+ relBuilder.push(oldAggRel.getInput());
+ final List<RexNode> inputExprs = new ArrayList<>(relBuilder.fields());
+
+ // create new agg function calls and rest of project list together
+ for (AggregateCall oldCall : oldCalls) {
+ projList.add(
+ reduceAgg(
+ oldAggRel, oldCall, newCalls, aggCallMapping, inputExprs));
+ }
+
+ final int extraArgCount =
+ inputExprs.size() - relBuilder.peek().getRowType().getFieldCount();
+ if (extraArgCount > 0) {
+ relBuilder.project(inputExprs,
+ CompositeList.of(
+ relBuilder.peek().getRowType().getFieldNames(),
+ Collections.<String>nCopies(extraArgCount, null)));
+ }
+ newAggregateRel(relBuilder, oldAggRel, newCalls);
+ newCalcRel(relBuilder, oldAggRel, projList);
+ ruleCall.transformTo(relBuilder.build());
+ }
+
+ private RexNode reduceAgg(
+ Aggregate oldAggRel,
+ AggregateCall oldCall,
+ List<AggregateCall> newCalls,
+ Map<AggregateCall, RexNode> aggCallMapping,
+ List<RexNode> inputExprs) {
+ final SqlKind kind = oldCall.getAggregation().getKind();
+ if (isReducible(kind)) {
+ switch (kind) {
+ case SUM:
+ // replace original SUM(x) with
+ // case COUNT(x) when 0 then null else SUM0(x) end
+ return reduceSum(oldAggRel, oldCall, newCalls, aggCallMapping);
+ case AVG:
+ // replace original AVG(x) with SUM(x) / COUNT(x)
+ return reduceAvg(oldAggRel, oldCall, newCalls, aggCallMapping, inputExprs);
+ case STDDEV_POP:
+ // replace original STDDEV_POP(x) with
+ // SQRT(
+ // (SUM(x * x) - SUM(x) * SUM(x) / COUNT(x))
+ // / COUNT(x))
+ return reduceStddev(oldAggRel, oldCall, true, true, newCalls,
+ aggCallMapping, inputExprs);
+ case STDDEV_SAMP:
+ // replace original STDDEV_POP(x) with
+ // SQRT(
+ // (SUM(x * x) - SUM(x) * SUM(x) / COUNT(x))
+ // / CASE COUNT(x) WHEN 1 THEN NULL ELSE COUNT(x) - 1 END)
+ return reduceStddev(oldAggRel, oldCall, false, true, newCalls,
+ aggCallMapping, inputExprs);
+ case VAR_POP:
+ // replace original VAR_POP(x) with
+ // (SUM(x * x) - SUM(x) * SUM(x) / COUNT(x))
+ // / COUNT(x)
+ return reduceStddev(oldAggRel, oldCall, true, false, newCalls,
+ aggCallMapping, inputExprs);
+ case VAR_SAMP:
+ // replace original VAR_POP(x) with
+ // (SUM(x * x) - SUM(x) * SUM(x) / COUNT(x))
+ // / CASE COUNT(x) WHEN 1 THEN NULL ELSE COUNT(x) - 1 END
+ return reduceStddev(oldAggRel, oldCall, false, false, newCalls,
+ aggCallMapping, inputExprs);
+ default:
+ throw Util.unexpected(kind);
+ }
+ } else {
+ // anything else: preserve original call
+ RexBuilder rexBuilder = oldAggRel.getCluster().getRexBuilder();
+ final int nGroups = oldAggRel.getGroupCount();
+ List<RelDataType> oldArgTypes =
+ SqlTypeUtil.projectTypes(
+ oldAggRel.getInput().getRowType(), oldCall.getArgList());
+ return rexBuilder.addAggCall(oldCall,
+ nGroups,
+ oldAggRel.indicator,
+ newCalls,
+ aggCallMapping,
+ oldArgTypes);
+ }
+ }
+
+ private AggregateCall createAggregateCallWithBinding(
+ RelDataTypeFactory typeFactory,
+ SqlAggFunction aggFunction,
+ RelDataType operandType,
+ Aggregate oldAggRel,
+ AggregateCall oldCall,
+ int argOrdinal) {
+ final Aggregate.AggCallBinding binding =
+ new Aggregate.AggCallBinding(typeFactory, aggFunction,
+ ImmutableList.of(operandType), oldAggRel.getGroupCount(),
+ oldCall.filterArg >= 0);
+ return AggregateCall.create(aggFunction,
+ oldCall.isDistinct(),
+ oldCall.isApproximate(),
+ ImmutableIntList.of(argOrdinal),
+ oldCall.filterArg,
+ aggFunction.inferReturnType(binding),
+ null);
+ }
+
+ private RexNode reduceAvg(
+ Aggregate oldAggRel,
+ AggregateCall oldCall,
+ List<AggregateCall> newCalls,
+ Map<AggregateCall, RexNode> aggCallMapping,
+ List<RexNode> inputExprs) {
+ final int nGroups = oldAggRel.getGroupCount();
+ final RexBuilder rexBuilder = oldAggRel.getCluster().getRexBuilder();
+ final int iAvgInput = oldCall.getArgList().get(0);
+ final RelDataType avgInputType =
+ getFieldType(
+ oldAggRel.getInput(),
+ iAvgInput);
+ final AggregateCall sumCall =
+ AggregateCall.create(SqlStdOperatorTable.SUM,
+ oldCall.isDistinct(),
+ oldCall.isApproximate(),
+ oldCall.getArgList(),
+ oldCall.filterArg,
+ oldAggRel.getGroupCount(),
+ oldAggRel.getInput(),
+ null,
+ null);
+ final AggregateCall countCall =
+ AggregateCall.create(SqlStdOperatorTable.COUNT,
+ oldCall.isDistinct(),
+ oldCall.isApproximate(),
+ oldCall.getArgList(),
+ oldCall.filterArg,
+ oldAggRel.getGroupCount(),
+ oldAggRel.getInput(),
+ null,
+ null);
+
+ // NOTE: these references are with respect to the output
+ // of newAggRel
+ RexNode numeratorRef =
+ rexBuilder.addAggCall(sumCall,
+ nGroups,
+ oldAggRel.indicator,
+ newCalls,
+ aggCallMapping,
+ ImmutableList.of(avgInputType));
+ final RexNode denominatorRef =
+ rexBuilder.addAggCall(countCall,
+ nGroups,
+ oldAggRel.indicator,
+ newCalls,
+ aggCallMapping,
+ ImmutableList.of(avgInputType));
+
+ final RelDataTypeFactory typeFactory = oldAggRel.getCluster().getTypeFactory();
+ final RelDataType avgType = typeFactory.createTypeWithNullability(
+ oldCall.getType(), numeratorRef.getType().isNullable());
+ numeratorRef = rexBuilder.ensureType(avgType, numeratorRef, true);
+ final RexNode divideRef =
+ rexBuilder.makeCall(SqlStdOperatorTable.DIVIDE, numeratorRef, denominatorRef);
+ return rexBuilder.makeCast(oldCall.getType(), divideRef);
+ }
+
+ private RexNode reduceSum(
+ Aggregate oldAggRel,
+ AggregateCall oldCall,
+ List<AggregateCall> newCalls,
+ Map<AggregateCall, RexNode> aggCallMapping) {
+ final int nGroups = oldAggRel.getGroupCount();
+ RexBuilder rexBuilder = oldAggRel.getCluster().getRexBuilder();
+ int arg = oldCall.getArgList().get(0);
+ RelDataType argType =
+ getFieldType(
+ oldAggRel.getInput(),
+ arg);
+ final AggregateCall sumZeroCall =
+ AggregateCall.create(SqlStdOperatorTable.SUM0, oldCall.isDistinct(),
+ oldCall.isApproximate(), oldCall.getArgList(), oldCall.filterArg,
+ oldAggRel.getGroupCount(), oldAggRel.getInput(), null,
+ oldCall.name);
+ final AggregateCall countCall =
+ AggregateCall.create(SqlStdOperatorTable.COUNT,
+ oldCall.isDistinct(),
+ oldCall.isApproximate(),
+ oldCall.getArgList(),
+ oldCall.filterArg,
+ oldAggRel.getGroupCount(),
+ oldAggRel,
+ null,
+ null);
+
+ // NOTE: these references are with respect to the output
+ // of newAggRel
+ RexNode sumZeroRef =
+ rexBuilder.addAggCall(sumZeroCall,
+ nGroups,
+ oldAggRel.indicator,
+ newCalls,
+ aggCallMapping,
+ ImmutableList.of(argType));
+ if (!oldCall.getType().isNullable()) {
+ // If SUM(x) is not nullable, the validator must have determined that
+ // nulls are impossible (because the group is never empty and x is never
+ // null). Therefore we translate to SUM0(x).
+ return sumZeroRef;
+ }
+ RexNode countRef =
+ rexBuilder.addAggCall(countCall,
+ nGroups,
+ oldAggRel.indicator,
+ newCalls,
+ aggCallMapping,
+ ImmutableList.of(argType));
+ return rexBuilder.makeCall(SqlStdOperatorTable.CASE,
+ rexBuilder.makeCall(SqlStdOperatorTable.EQUALS,
+ countRef, rexBuilder.makeExactLiteral(BigDecimal.ZERO)),
+ rexBuilder.makeCast(sumZeroRef.getType(), rexBuilder.constantNull()),
+ sumZeroRef);
+ }
+
+ private RexNode reduceStddev(
+ Aggregate oldAggRel,
+ AggregateCall oldCall,
+ boolean biased,
+ boolean sqrt,
+ List<AggregateCall> newCalls,
+ Map<AggregateCall, RexNode> aggCallMapping,
+ List<RexNode> inputExprs) {
+ // stddev_pop(x) ==>
+ // power(
+ // (sum(x * x) - sum(x) * sum(x) / count(x))
+ // / count(x),
+ // .5)
+ //
+ // stddev_samp(x) ==>
+ // power(
+ // (sum(x * x) - sum(x) * sum(x) / count(x))
+ // / nullif(count(x) - 1, 0),
+ // .5)
+ final int nGroups = oldAggRel.getGroupCount();
+ final RelOptCluster cluster = oldAggRel.getCluster();
+ final RexBuilder rexBuilder = cluster.getRexBuilder();
+ final RelDataTypeFactory typeFactory = cluster.getTypeFactory();
+
+ assert oldCall.getArgList().size() == 1 : oldCall.getArgList();
+ final int argOrdinal = oldCall.getArgList().get(0);
+ final RelDataType argOrdinalType = getFieldType(oldAggRel.getInput(), argOrdinal);
+ final RelDataType oldCallType =
+ typeFactory.createTypeWithNullability(oldCall.getType(),
+ argOrdinalType.isNullable());
+
+ final RexNode argRef =
+ rexBuilder.ensureType(oldCallType, inputExprs.get(argOrdinal), true);
+ final int argRefOrdinal = lookupOrAdd(inputExprs, argRef);
+
+ final RexNode argSquared = rexBuilder.makeCall(SqlStdOperatorTable.MULTIPLY,
+ argRef, argRef);
+ final int argSquaredOrdinal = lookupOrAdd(inputExprs, argSquared);
+
+ final AggregateCall sumArgSquaredAggCall =
+ createAggregateCallWithBinding(typeFactory, SqlStdOperatorTable.SUM,
+ argSquared.getType(), oldAggRel, oldCall, argSquaredOrdinal);
+
+ final RexNode sumArgSquared =
+ rexBuilder.addAggCall(sumArgSquaredAggCall,
+ nGroups,
+ oldAggRel.indicator,
+ newCalls,
+ aggCallMapping,
+ ImmutableList.of(sumArgSquaredAggCall.getType()));
+
+ final AggregateCall sumArgAggCall =
+ AggregateCall.create(SqlStdOperatorTable.SUM,
+ oldCall.isDistinct(),
+ oldCall.isApproximate(),
+ ImmutableIntList.of(argOrdinal),
+ oldCall.filterArg,
+ oldAggRel.getGroupCount(),
+ oldAggRel.getInput(),
+ null,
+ null);
+
+ final RexNode sumArg =
+ rexBuilder.addAggCall(sumArgAggCall,
+ nGroups,
+ oldAggRel.indicator,
+ newCalls,
+ aggCallMapping,
+ ImmutableList.of(sumArgAggCall.getType()));
+ final RexNode sumArgCast = rexBuilder.ensureType(oldCallType, sumArg, true);
+ final RexNode sumSquaredArg =
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.MULTIPLY, sumArgCast, sumArgCast);
+
+ final AggregateCall countArgAggCall =
+ AggregateCall.create(SqlStdOperatorTable.COUNT,
+ oldCall.isDistinct(),
+ oldCall.isApproximate(),
+ oldCall.getArgList(),
+ oldCall.filterArg,
+ oldAggRel.getGroupCount(),
+ oldAggRel,
+ null,
+ null);
+
+ final RexNode countArg =
+ rexBuilder.addAggCall(countArgAggCall,
+ nGroups,
+ oldAggRel.indicator,
+ newCalls,
+ aggCallMapping,
+ ImmutableList.of(argOrdinalType));
+
+ final RexNode avgSumSquaredArg =
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.DIVIDE, sumSquaredArg, countArg);
+
+ final RexNode diff =
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.MINUS,
+ sumArgSquared, avgSumSquaredArg);
+
+ final RexNode denominator;
+ if (biased) {
+ denominator = countArg;
+ } else {
+ final RexLiteral one =
+ rexBuilder.makeExactLiteral(BigDecimal.ONE);
+ final RexNode nul =
+ rexBuilder.makeCast(countArg.getType(), rexBuilder.constantNull());
+ final RexNode countMinusOne =
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.MINUS, countArg, one);
+ final RexNode countEqOne =
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.EQUALS, countArg, one);
+ denominator =
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.CASE,
+ countEqOne, nul, countMinusOne);
+ }
+
+ final RexNode div =
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.DIVIDE, diff, denominator);
+
+ RexNode result = div;
+ if (sqrt) {
+ final RexNode half =
+ rexBuilder.makeExactLiteral(new BigDecimal("0.5"));
+ result =
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.POWER, div, half);
+ }
+
+ return rexBuilder.makeCast(
+ oldCall.getType(), result);
+ }
+
+ /**
+ * Finds the ordinal of an element in a list, or adds it.
+ *
+ * @param list List
+ * @param element Element to lookup or add
+ * @param <T> Element type
+ * @return Ordinal of element in list
+ */
+ private static <T> int lookupOrAdd(List<T> list, T element) {
+ int ordinal = list.indexOf(element);
+ if (ordinal == -1) {
+ ordinal = list.size();
+ list.add(element);
+ }
+ return ordinal;
+ }
+
+ /**
+ * Do a shallow clone of oldAggRel and update aggCalls. Could be refactored
+ * into Aggregate and subclasses - but it's only needed for some
+ * subclasses.
+ *
+ * @param relBuilder Builder of relational expressions; at the top of its
+ * stack is its input
+ * @param oldAggregate LogicalAggregate to clone.
+ * @param newCalls New list of AggregateCalls
+ */
+ protected void newAggregateRel(RelBuilder relBuilder,
+ Aggregate oldAggregate,
+ List<AggregateCall> newCalls) {
+ relBuilder.aggregate(
+ relBuilder.groupKey(oldAggregate.getGroupSet(),
+ oldAggregate.getGroupSets()),
+ newCalls);
+ }
+
+ /**
+ * Add a calc with the expressions to compute the original agg calls from the
+ * decomposed ones.
+ *
+ * @param relBuilder Builder of relational expressions; at the top of its
+ * stack is its input
+ * @param oldAggregate The original LogicalAggregate that is replaced.
+ * @param exprs The expressions to compute the original agg calls.
+ */
+ protected void newCalcRel(RelBuilder relBuilder,
+ Aggregate oldAggregate,
+ List<RexNode> exprs) {
+ relBuilder.project(exprs, oldAggregate.getRowType().getFieldNames());
+ }
+
+ private RelDataType getFieldType(RelNode relNode, int i) {
+ final RelDataTypeField inputField =
+ relNode.getRowType().getFieldList().get(i);
+ return inputField.getType();
+ }
+}
+
+// End AggregateReduceFunctionsRule.java
http://git-wip-us.apache.org/repos/asf/flink/blob/8c042e37/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala
index e1e93c7..17b6f1b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala
@@ -30,7 +30,7 @@ import org.apache.calcite.sql.SqlKind
import org.apache.calcite.util.ImmutableBitSet
import org.apache.flink.table.plan.nodes.FlinkConventions
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
class FlinkLogicalAggregate(
cluster: RelOptCluster,
@@ -74,8 +74,11 @@ private class FlinkLogicalAggregateConverter
// we do not support these functions natively
// they have to be converted using the AggregateReduceFunctionsRule
- val supported = agg.getAggCallList.map(_.getAggregation.getKind).forall {
- case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP => false
+ val supported = agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+ // we support AVG
+ case SqlKind.AVG => true
+ // but none of the other AVG agg functions
+ case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false
case _ => true
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8c042e37/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
index 3e605e8..f2576f4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
@@ -26,6 +26,7 @@ import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelNode, RelShuttle}
+import org.apache.calcite.sql.SqlKind
import org.apache.calcite.util.ImmutableBitSet
import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.table.calcite.FlinkTypeFactory
@@ -33,6 +34,8 @@ import org.apache.flink.table.plan.logical.LogicalWindow
import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
import org.apache.flink.table.plan.nodes.FlinkConventions
+import scala.collection.JavaConverters._
+
class FlinkLogicalWindowAggregate(
window: LogicalWindow,
namedProperties: Seq[NamedWindowProperty],
@@ -103,6 +106,20 @@ class FlinkLogicalWindowAggregateConverter
FlinkConventions.LOGICAL,
"FlinkLogicalWindowAggregateConverter") {
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+ // we do not support these functions natively
+ // they have to be converted using the WindowAggregateReduceFunctionsRule
+ agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+ // we support AVG
+ case SqlKind.AVG => true
+ // but none of the other AVG agg functions
+ case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false
+ case _ => true
+ }
+ }
+
override def convert(rel: RelNode): RelNode = {
val agg = rel.asInstanceOf[LogicalWindowAggregate]
val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)
http://git-wip-us.apache.org/repos/asf/flink/blob/8c042e37/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index d3ad2ac..9f3b8e9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -93,6 +93,7 @@ object FlinkRuleSets {
// reduce aggregate functions like AVG, STDDEV_POP etc.
AggregateReduceFunctionsRule.INSTANCE,
+ WindowAggregateReduceFunctionsRule.INSTANCE,
// remove unnecessary sort rule
SortRemoveRule.INSTANCE,
http://git-wip-us.apache.org/repos/asf/flink/blob/8c042e37/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowAggregateReduceFunctionsRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowAggregateReduceFunctionsRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowAggregateReduceFunctionsRule.scala
new file mode 100644
index 0000000..4ca2b33
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowAggregateReduceFunctionsRule.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.common
+
+import java.util
+
+import org.apache.calcite.plan.RelOptRule
+import org.apache.calcite.rel.core.{Aggregate, AggregateCall, RelFactories}
+import org.apache.calcite.rel.logical.LogicalAggregate
+import org.apache.calcite.rel.rules.AggregateReduceFunctionsRule
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
+
+/**
+ * Rule to convert complex aggregation functions into simpler ones.
+ * Have a look at [[AggregateReduceFunctionsRule]] for details.
+ */
+class WindowAggregateReduceFunctionsRule extends AggregateReduceFunctionsRule(
+ RelOptRule.operand(classOf[LogicalWindowAggregate], RelOptRule.any()),
+ RelFactories.LOGICAL_BUILDER) {
+
+ override def newAggregateRel(
+ relBuilder: RelBuilder,
+ oldAgg: Aggregate,
+ newCalls: util.List[AggregateCall]): Unit = {
+
+ // create a LogicalAggregate with simpler aggregation functions
+ super.newAggregateRel(relBuilder, oldAgg, newCalls)
+ // pop LogicalAggregate from RelBuilder
+ val newAgg = relBuilder.build().asInstanceOf[LogicalAggregate]
+
+ // create a new LogicalWindowAggregate (based on the new LogicalAggregate) and push it on the
+ // RelBuilder
+ val oldWindowAgg = oldAgg.asInstanceOf[LogicalWindowAggregate]
+ relBuilder.push(LogicalWindowAggregate.create(
+ oldWindowAgg.getWindow,
+ oldWindowAgg.getNamedProperties,
+ newAgg))
+ }
+
+ override def newCalcRel(
+ relBuilder: RelBuilder,
+ oldAgg: Aggregate,
+ exprs: util.List[RexNode]): Unit = {
+
+ // add all named properties of the window to the selection
+ val oldWindowAgg = oldAgg.asInstanceOf[LogicalWindowAggregate]
+ oldWindowAgg.getNamedProperties.foreach(np => exprs.add(relBuilder.field(np.name)))
+
+ // create a LogicalCalc that computes the complex aggregates and forwards the window properties
+ relBuilder.project(exprs, oldAgg.getRowType.getFieldNames)
+ }
+
+}
+
+object WindowAggregateReduceFunctionsRule {
+ val INSTANCE = new WindowAggregateReduceFunctionsRule
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8c042e37/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index df9b1c5..ce0a9c9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -1259,7 +1259,7 @@ object AggregateUtil {
}
}
- case _: SqlAvgAggFunction =>
+ case a: SqlAvgAggFunction if a.kind == SqlKind.AVG =>
aggregates(index) = sqlTypeName match {
case TINYINT =>
new ByteAvgAggFunction
@@ -1413,7 +1413,7 @@ object AggregateUtil {
accTypes(index) = udagg.accType
case unSupported: SqlAggFunction =>
- throw new TableException(s"unsupported Function: '${unSupported.getName}'")
+ throw new TableException(s"Unsupported Function: '${unSupported.getName}'")
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8c042e37/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
index 8d06bcd..b1369e2 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
@@ -304,4 +304,53 @@ class GroupWindowTest extends TableTestBase {
util.verifySql(sql, expected)
}
+
+ @Test
+ def testDecomposableAggFunctions() = {
+ val util = batchTestUtil()
+ util.addTable[(Int, String, Long, Timestamp)]("MyTable", 'a, 'b, 'c, 'rowtime)
+
+ val sql =
+ "SELECT " +
+ " VAR_POP(c), VAR_SAMP(c), STDDEV_POP(c), STDDEV_SAMP(c), " +
+ " TUMBLE_START(rowtime, INTERVAL '15' MINUTE), " +
+ " TUMBLE_END(rowtime, INTERVAL '15' MINUTE)" +
+ "FROM MyTable " +
+ "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
+
+ val expected =
+ unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetWindowAggregate",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "rowtime", "c",
+ "*(c, c) AS $f2", "*(c, c) AS $f3", "*(c, c) AS $f4", "*(c, c) AS $f5")
+ ),
+ term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
+ term("select",
+ "SUM($f2) AS $f0",
+ "SUM(c) AS $f1",
+ "COUNT(c) AS $f2",
+ "SUM($f3) AS $f3",
+ "SUM($f4) AS $f4",
+ "SUM($f5) AS $f5",
+ "start('w$) AS w$start",
+ "end('w$) AS w$end",
+ "rowtime('w$) AS w$rowtime")
+ ),
+ term("select",
+ "CAST(/(-($f0, /(*($f1, $f1), $f2)), $f2)) AS EXPR$0",
+ "CAST(/(-($f3, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null, -($f2, 1)))) AS EXPR$1",
+ "CAST(POWER(/(-($f4, /(*($f1, $f1), $f2)), $f2), 0.5)) AS EXPR$2",
+ "CAST(POWER(/(-($f5, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null, -($f2, 1))), 0.5)) " +
+ "AS EXPR$3",
+ "CAST(w$start) AS EXPR$4",
+ "CAST(w$end) AS EXPR$5")
+ )
+
+ util.verifySql(sql, expected)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8c042e37/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
index ad44e09..27c1d7f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
@@ -449,4 +449,49 @@ class GroupWindowTest extends TableTestBase {
util.verifyTable(windowedTable, expected)
}
+
+ @Test
+ def testDecomposableAggFunctions(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[(Long, Int, String, Long)]('rowtime, 'a, 'b, 'c)
+
+ val windowedTable = table
+ .window(Tumble over 15.minutes on 'rowtime as 'w)
+ .groupBy('w)
+ .select('c.varPop, 'c.varSamp, 'c.stddevPop, 'c.stddevSamp, 'w.start, 'w.end)
+
+ val expected =
+ unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetWindowAggregate",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "c", "rowtime",
+ "*(c, c) AS $f2", "*(c, c) AS $f3", "*(c, c) AS $f4", "*(c, c) AS $f5")
+ ),
+ term("window", TumblingGroupWindow('w, 'rowtime, 900000.millis)),
+ term("select",
+ "SUM($f2) AS $f0",
+ "SUM(c) AS $f1",
+ "COUNT(c) AS $f2",
+ "SUM($f3) AS $f3",
+ "SUM($f4) AS $f4",
+ "SUM($f5) AS $f5",
+ "start('w) AS TMP_4",
+ "end('w) AS TMP_5")
+ ),
+ term("select",
+ "CAST(/(-($f0, /(*($f1, $f1), $f2)), $f2)) AS TMP_0",
+ "CAST(/(-($f3, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null, -($f2, 1)))) AS TMP_1",
+ "CAST(POWER(/(-($f4, /(*($f1, $f1), $f2)), $f2), 0.5)) AS TMP_2",
+ "CAST(POWER(/(-($f5, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null, -($f2, 1))), 0.5)) " +
+ "AS TMP_3",
+ "TMP_4",
+ "TMP_5")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8c042e37/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
index d7d5f1e..d292834 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
@@ -260,4 +260,50 @@ class GroupWindowTest extends TableTestBase {
streamUtil.verifySql(sql, expected)
}
+
+ @Test
+ def testDecomposableAggFunctions() = {
+
+ val sql =
+ "SELECT " +
+ " VAR_POP(c), VAR_SAMP(c), STDDEV_POP(c), STDDEV_SAMP(c), " +
+ " TUMBLE_START(rowtime, INTERVAL '15' MINUTE), " +
+ " TUMBLE_END(rowtime, INTERVAL '15' MINUTE)" +
+ "FROM MyTable " +
+ "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupWindowAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "rowtime", "c",
+ "*(c, c) AS $f2", "*(c, c) AS $f3", "*(c, c) AS $f4", "*(c, c) AS $f5")
+ ),
+ term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
+ term("select",
+ "SUM($f2) AS $f0",
+ "SUM(c) AS $f1",
+ "COUNT(c) AS $f2",
+ "SUM($f3) AS $f3",
+ "SUM($f4) AS $f4",
+ "SUM($f5) AS $f5",
+ "start('w$) AS w$start",
+ "end('w$) AS w$end",
+ "rowtime('w$) AS w$rowtime",
+ "proctime('w$) AS w$proctime")
+ ),
+ term("select",
+ "CAST(/(-($f0, /(*($f1, $f1), $f2)), $f2)) AS EXPR$0",
+ "CAST(/(-($f3, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null, -($f2, 1)))) AS EXPR$1",
+ "CAST(POWER(/(-($f4, /(*($f1, $f1), $f2)), $f2), 0.5)) AS EXPR$2",
+ "CAST(POWER(/(-($f5, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null, -($f2, 1))), 0.5)) " +
+ "AS EXPR$3",
+ "w$start AS EXPR$4",
+ "w$end AS EXPR$5")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8c042e37/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
index 260726b..a59ad83 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
@@ -782,4 +782,49 @@ class GroupWindowTest extends TableTestBase {
util.verifyTable(windowedTable, expected)
}
+
+ @Test
+ def testDecomposableAggFunctions(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String, Long)]('rowtime.rowtime, 'a, 'b, 'c)
+
+ val windowedTable = table
+ .window(Tumble over 15.minutes on 'rowtime as 'w)
+ .groupBy('w)
+ .select('c.varPop, 'c.varSamp, 'c.stddevPop, 'c.stddevSamp, 'w.start, 'w.end)
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupWindowAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "c", "rowtime",
+ "*(c, c) AS $f2", "*(c, c) AS $f3", "*(c, c) AS $f4", "*(c, c) AS $f5")
+ ),
+ term("window", TumblingGroupWindow('w, 'rowtime, 900000.millis)),
+ term("select",
+ "SUM($f2) AS $f0",
+ "SUM(c) AS $f1",
+ "COUNT(c) AS $f2",
+ "SUM($f3) AS $f3",
+ "SUM($f4) AS $f4",
+ "SUM($f5) AS $f5",
+ "start('w) AS TMP_4",
+ "end('w) AS TMP_5")
+ ),
+ term("select",
+ "CAST(/(-($f0, /(*($f1, $f1), $f2)), $f2)) AS TMP_0",
+ "CAST(/(-($f3, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null, -($f2, 1)))) AS TMP_1",
+ "CAST(POWER(/(-($f4, /(*($f1, $f1), $f2)), $f2), 0.5)) AS TMP_2",
+ "CAST(POWER(/(-($f5, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null, -($f2, 1))), 0.5)) " +
+ "AS TMP_3",
+ "TMP_4",
+ "TMP_5")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
}