You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/07 17:56:51 UTC
[2/2] flink git commit: [FLINK-5803] [table] Add support for procTime
partitioned OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL.
[FLINK-5803] [table] Add support for procTime partitioned OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL.
This closes #3397.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53fb8f3b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53fb8f3b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53fb8f3b
Branch: refs/heads/master
Commit: 53fb8f3b532a1be3d3cd655fbb5516a7e1ae8ada
Parents: cd801aa
Author: \u91d1\u7af9 <ji...@alibaba-inc.com>
Authored: Wed Feb 15 18:16:06 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Mar 7 18:56:15 2017 +0100
----------------------------------------------------------------------
flink-libraries/flink-table/pom.xml | 7 +
.../table/codegen/calls/FunctionGenerator.scala | 2 +-
.../flink/table/plan/nodes/OverAggregate.scala | 95 ++++++++++
.../datastream/DataStreamOverAggregate.scala | 185 +++++++++++++++++++
.../flink/table/plan/rules/FlinkRuleSets.scala | 7 +-
.../DataStreamOverAggregateRule.scala | 63 +++++++
.../table/runtime/aggregate/AggregateUtil.scala | 63 ++++---
...UnboundedProcessingOverProcessFunction.scala | 85 +++++++++
.../table/api/scala/stream/sql/SqlITCase.scala | 101 +++++++++-
.../scala/stream/sql/WindowAggregateTest.scala | 56 ++++++
.../utils/StreamingWithStateTestBase.scala | 40 ++++
11 files changed, 673 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/53fb8f3b/flink-libraries/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml
index c6071b0..428b947 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -117,6 +117,13 @@ under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-statebackend-rocksdb_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/flink/blob/53fb8f3b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index 7d55957..27e6dc6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -330,7 +330,7 @@ object FunctionGenerator {
)
// generate a constant for time indicator functions.
- // this is a temporary solution and will be removed when FLINK-5884 is implemented.
+ // this is a temporary solution and will be removed when FLINK-5884 is implemented.
case ProcTimeExtractor | EventTimeExtractor =>
Some(new CallGenerator {
override def generate(codeGenerator: CodeGenerator, operands: Seq[GeneratedExpression]) = {
http://git-wip-us.apache.org/repos/asf/flink/blob/53fb8f3b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
new file mode 100644
index 0000000..793ab23
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rel.RelFieldCollation
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl}
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.core.Window.Group
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.functions.{ProcTimeType, RowTimeType}
+import scala.collection.JavaConverters._
+
+trait OverAggregate {
+
+ private[flink] def partitionToString(inputType: RelDataType, partition: Array[Int]): String = {
+ val inFields = inputType.getFieldNames.asScala
+ partition.map( inFields(_) ).mkString(", ")
+ }
+
+ private[flink] def orderingToString(
+ inputType: RelDataType,
+ orderFields: java.util.List[RelFieldCollation]): String = {
+
+ val inFields = inputType.getFieldList.asScala
+
+ val orderingString = orderFields.asScala.map {
+ x => inFields(x.getFieldIndex).getValue
+ }.mkString(", ")
+
+ orderingString
+ }
+
+ private[flink] def windowRange(overWindow: Group): String = {
+ s"BETWEEN ${overWindow.lowerBound} AND ${overWindow.upperBound}"
+ }
+
+ private[flink] def aggregationToString(
+ inputType: RelDataType,
+ rowType: RelDataType,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]]): String = {
+
+ val inFields = inputType.getFieldList.asScala.map {
+ x =>
+ x.asInstanceOf[RelDataTypeFieldImpl].getType
+ match {
+ case proceTime: ProcTimeType => "PROCTIME"
+ case rowTime: RowTimeType => "ROWTIME"
+ case _ => x.asInstanceOf[RelDataTypeFieldImpl].getName
+ }
+ }
+ val outFields = rowType.getFieldList.asScala.map {
+ x =>
+ x.asInstanceOf[RelDataTypeFieldImpl].getType
+ match {
+ case proceTime: ProcTimeType => "PROCTIME"
+ case rowTime: RowTimeType => "ROWTIME"
+ case _ => x.asInstanceOf[RelDataTypeFieldImpl].getName
+ }
+ }
+
+ val aggStrings = namedAggregates.map(_.getKey).map(
+ a => s"${a.getAggregation}(${
+ if (a.getArgList.size() > 0) {
+ inFields(a.getArgList.get(0))
+ } else {
+ "*"
+ }
+ })")
+
+ (inFields ++ aggStrings).zip(outFields).map {
+ case (f, o) => if (f == o) {
+ f
+ } else {
+ s"$f AS $o"
+ }
+ }.mkString(", ")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fb8f3b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
new file mode 100644
index 0000000..db115e0
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.plan.nodes.OverAggregate
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.core.Window
+import org.apache.calcite.rel.core.Window.Group
+import java.util.{List => JList}
+
+import org.apache.flink.table.functions.{ProcTimeType, RowTimeType}
+import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
+
+class DataStreamOverAggregate(
+ logicWindow: Window,
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ inputNode: RelNode,
+ rowRelDataType: RelDataType,
+ inputType: RelDataType)
+ extends SingleRel(cluster, traitSet, inputNode)
+ with OverAggregate
+ with DataStreamRel {
+
+ override def deriveRowType(): RelDataType = rowRelDataType
+
+ override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
+ new DataStreamOverAggregate(
+ logicWindow,
+ cluster,
+ traitSet,
+ inputs.get(0),
+ getRowType,
+ inputType)
+ }
+
+ override def toString: String = {
+ s"OverAggregate($aggOpName)"
+ }
+
+ override def explainTerms(pw: RelWriter): RelWriter = {
+
+ val overWindow: Group = logicWindow.groups.get(0)
+ val partitionKeys: Array[Int] = overWindow.keys.toArray
+ val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates
+
+ super.explainTerms(pw)
+ .itemIf("partitionBy", partitionToString(inputType, partitionKeys), partitionKeys.nonEmpty)
+ .item("orderBy",orderingToString(inputType, overWindow.orderKeys.getFieldCollations))
+ .itemIf("rows", windowRange(overWindow), overWindow.isRows)
+ .itemIf("range", windowRange(overWindow), !overWindow.isRows)
+ .item(
+ "select", aggregationToString(
+ inputType,
+ getRowType,
+ namedAggregates))
+ }
+
+ override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
+ if (logicWindow.groups.size > 1) {
+ throw new TableException(
+ "Unsupported use of OVER windows. All aggregates must be computed on the same window.")
+ }
+
+ val overWindow: org.apache.calcite.rel.core.Window.Group = logicWindow.groups.get(0)
+
+ val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+
+ if (overWindow.orderKeys.getFieldCollations.size() != 1) {
+ throw new TableException(
+ "Unsupported use of OVER windows. The window may only be ordered by a single time column.")
+ }
+
+ val timeType = inputType
+ .getFieldList
+ .get(overWindow.orderKeys.getFieldCollations.get(0).getFieldIndex)
+ .getValue
+
+ timeType match {
+ case _: ProcTimeType =>
+ // both ROWS and RANGE clause with UNBOUNDED PRECEDING and CURRENT ROW condition.
+ if (overWindow.lowerBound.isUnbounded &&
+ overWindow.upperBound.isCurrentRow) {
+ createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS)
+ } else {
+ throw new TableException(
+ "OVER window only support ProcessingTime UNBOUNDED PRECEDING and CURRENT ROW " +
+ "condition.")
+ }
+ case _: RowTimeType =>
+ throw new TableException("OVER Window of the EventTime type is not currently supported.")
+ case _ =>
+ throw new TableException(s"Unsupported time type {$timeType}")
+ }
+
+ }
+
+ def createUnboundedAndCurrentRowProcessingTimeOverWindow(
+ inputDS: DataStream[Row]): DataStream[Row] = {
+
+ val overWindow: Group = logicWindow.groups.get(0)
+ val partitionKeys: Array[Int] = overWindow.keys.toArray
+ val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates
+
+ // get the output types
+ val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+
+ val result: DataStream[Row] =
+ // partitioned aggregation
+ if (partitionKeys.nonEmpty) {
+ val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction(
+ namedAggregates,
+ inputType)
+
+ inputDS
+ .keyBy(partitionKeys: _*)
+ .process(processFunction)
+ .returns(rowTypeInfo)
+ .name(aggOpName)
+ .asInstanceOf[DataStream[Row]]
+ }
+ // global non-partitioned aggregation
+ else {
+ throw TableException(
+ "Non-partitioned processing time OVER aggregation is not supported yet.")
+ }
+ result
+ }
+
+ private def generateNamedAggregates: Seq[CalcitePair[AggregateCall, String]] = {
+ val overWindow: Group = logicWindow.groups.get(0)
+
+ val aggregateCalls = overWindow.getAggregateCalls(logicWindow)
+ for (i <- 0 until aggregateCalls.size())
+ yield new CalcitePair[AggregateCall, String](aggregateCalls.get(i), "w0$o" + i)
+ }
+
+ private def aggOpName = {
+ val overWindow: Group = logicWindow.groups.get(0)
+ val partitionKeys: Array[Int] = overWindow.keys.toArray
+ val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates
+
+ s"over: (${
+ if (!partitionKeys.isEmpty) {
+ s"PARTITION BY: ${partitionToString(inputType, partitionKeys)}, "
+ } else {
+ ""
+ }
+ }ORDER BY: ${orderingToString(inputType, overWindow.orderKeys.getFieldCollations)}, " +
+ s"${if (overWindow.isRows) "ROWS" else "RANGE"}" +
+ s"${windowRange(overWindow)}, " +
+ s"select: (${
+ aggregationToString(
+ inputType,
+ getRowType,
+ namedAggregates)
+ }))"
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/53fb8f3b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 8f16d32..39e4353 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -35,7 +35,8 @@ object FlinkRuleSets {
ReduceExpressionsRule.FILTER_INSTANCE,
ReduceExpressionsRule.PROJECT_INSTANCE,
ReduceExpressionsRule.CALC_INSTANCE,
- ReduceExpressionsRule.JOIN_INSTANCE
+ ReduceExpressionsRule.JOIN_INSTANCE,
+ ProjectToWindowRule.PROJECT
)
/**
@@ -134,7 +135,8 @@ object FlinkRuleSets {
// simplify expressions rules
ReduceExpressionsRule.FILTER_INSTANCE,
ReduceExpressionsRule.PROJECT_INSTANCE,
- ReduceExpressionsRule.CALC_INSTANCE
+ ReduceExpressionsRule.CALC_INSTANCE,
+ ProjectToWindowRule.PROJECT
)
/**
@@ -167,6 +169,7 @@ object FlinkRuleSets {
UnionEliminatorRule.INSTANCE,
// translate to DataStream nodes
+ DataStreamOverAggregateRule.INSTANCE,
DataStreamAggregateRule.INSTANCE,
DataStreamCalcRule.INSTANCE,
DataStreamScanRule.INSTANCE,
http://git-wip-us.apache.org/repos/asf/flink/blob/53fb8f3b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala
new file mode 100644
index 0000000..214d68b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalWindow
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
+import org.apache.flink.table.plan.nodes.datastream.DataStreamOverAggregate
+
+import scala.collection.JavaConversions._
+
+/**
+ * Rule to convert a LogicalWindow into a DataStreamOverAggregate.
+ */
+class DataStreamOverAggregateRule
+ extends ConverterRule(
+ classOf[LogicalWindow],
+ Convention.NONE,
+ DataStreamConvention.INSTANCE,
+ "DataStreamOverAggregateRule") {
+
+ override def convert(rel: RelNode): RelNode = {
+ val logicWindow: LogicalWindow = rel.asInstanceOf[LogicalWindow]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+ val convertInput: RelNode =
+ RelOptRule.convert(logicWindow.getInput, DataStreamConvention.INSTANCE)
+
+ val inputRowType = convertInput.asInstanceOf[RelSubset].getOriginal.getRowType
+
+ new DataStreamOverAggregate(
+ logicWindow,
+ rel.getCluster,
+ traitSet,
+ convertInput,
+ rel.getRowType,
+ inputRowType)
+ }
+}
+
+object DataStreamOverAggregateRule {
+ val INSTANCE: RelOptRule = new DataStreamOverAggregateRule
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/53fb8f3b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 745660d..acb6cd0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -26,7 +26,7 @@ import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.calcite.sql.fun._
import org.apache.flink.api.common.functions.{GroupCombineFunction, InvalidTypesException, MapFunction, MapPartitionFunction, RichGroupReduceFunction, AggregateFunction => ApiAggregateFunction}
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeHint, TypeInformation}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
@@ -36,6 +36,7 @@ import org.apache.flink.table.plan.logical._
import org.apache.flink.table.typeutils.TypeCheckUtils._
import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction}
import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
+import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.table.api.{TableException, Types}
import org.apache.flink.table.functions.aggfunctions._
import org.apache.flink.table.functions.{AggregateFunction => TableAggregateFunction}
@@ -78,8 +79,7 @@ object AggregateUtil {
val (aggFieldIndexes, aggregates) = transformToAggregateFunctions(
namedAggregates.map(_.getKey),
inputType,
- groupings.length,
- false)
+ needRetraction = false)
val mapReturnType: RowTypeInfo =
createDataSetAggregateBufferDataType(groupings, aggregates, inputType)
@@ -94,6 +94,33 @@ object AggregateUtil {
}
/**
+ * Create an [[ProcessFunction]] to evaluate final aggregate value.
+ *
+ * @param namedAggregates List of calls to aggregate functions and their output field names
+ * @param inputType Input row type
+ * @return [[UnboundedProcessingOverProcessFunction]]
+ */
+ private[flink] def CreateUnboundedProcessingOverProcessFunction(
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType): UnboundedProcessingOverProcessFunction = {
+
+ val (aggFields, aggregates) =
+ transformToAggregateFunctions(
+ namedAggregates.map(_.getKey),
+ inputType,
+ needRetraction = false)
+
+ val aggregationStateType: RowTypeInfo =
+ createDataSetAggregateBufferDataType(Array(), aggregates, inputType)
+
+ new UnboundedProcessingOverProcessFunction(
+ aggregates,
+ aggFields,
+ inputType.getFieldCount,
+ aggregationStateType)
+ }
+
+ /**
* Create a [[org.apache.flink.api.common.functions.MapFunction]] that prepares for aggregates.
* The output of the function contains the grouping keys and the timestamp and the intermediate
* aggregate values of all aggregate function. The timestamp field is aligned to time window
@@ -126,8 +153,7 @@ object AggregateUtil {
val (aggFieldIndexes, aggregates) = transformToAggregateFunctions(
namedAggregates.map(_.getKey),
inputType,
- groupings.length,
- false)
+ needRetraction = false)
val mapReturnType: RowTypeInfo =
createDataSetAggregateBufferDataType(
@@ -180,8 +206,7 @@ object AggregateUtil {
val aggregates = transformToAggregateFunctions(
namedAggregates.map(_.getKey),
inputType,
- groupings.length,
- false)._2
+ needRetraction = false)._2
// the mapping relation between field index of intermediate aggregate Row and output Row.
val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings)
@@ -278,7 +303,7 @@ object AggregateUtil {
val aggregates = transformToAggregateFunctions(
namedAggregates.map(_.getKey),
inputType,
- 0)._2
+ needRetraction = false)._2
window match {
case EventTimeSessionGroupWindow(_, _, gap) =>
@@ -327,8 +352,7 @@ object AggregateUtil {
val aggregates = transformToAggregateFunctions(
namedAggregates.map(_.getKey),
inputType,
- groupings.length,
- false)._2
+ needRetraction = false)._2
window match {
case EventTimeSessionGroupWindow(_, _, gap) =>
@@ -366,11 +390,10 @@ object AggregateUtil {
inGroupingSet: Boolean)
: RichGroupReduceFunction[Row, Row] = {
- val (aggFieldIndex, aggregates) = transformToAggregateFunctions(
+ val aggregates = transformToAggregateFunctions(
namedAggregates.map(_.getKey),
inputType,
- groupings.length,
- false)
+ needRetraction = false)._2
val (groupingOffsetMapping, aggOffsetMapping) =
getGroupingOffsetAndAggOffsetMapping(
@@ -466,8 +489,7 @@ object AggregateUtil {
transformToAggregateFunctions(
namedAggregates.map(_.getKey),
inputType,
- groupKeysIndex.length,
- false)
+ needRetraction = false)
val aggregateMapping = getAggregateMapping(namedAggregates, outputType)
@@ -496,8 +518,7 @@ object AggregateUtil {
val aggregateList = transformToAggregateFunctions(
aggregateCalls,
inputType,
- groupKeysCount,
- false)._2
+ needRetraction = false)._2
doAllSupportPartialMerge(aggregateList)
}
@@ -598,7 +619,7 @@ object AggregateUtil {
val propPos = properties.foldRight((None: Option[Int], None: Option[Int], 0)) {
(p, x) => p match {
- case NamedWindowProperty(name, prop) =>
+ case NamedWindowProperty(_, prop) =>
prop match {
case WindowStart(_) if x._1.isDefined =>
throw new TableException("Duplicate WindowStart property encountered. This is a bug.")
@@ -617,7 +638,6 @@ object AggregateUtil {
private def transformToAggregateFunctions(
aggregateCalls: Seq[AggregateCall],
inputType: RelDataType,
- groupKeysCount: Int,
needRetraction: Boolean)
: (Array[Int], Array[TableAggregateFunction[_ <: Any]]) = {
@@ -625,11 +645,6 @@ object AggregateUtil {
val aggFieldIndexes = new Array[Int](aggregateCalls.size)
val aggregates = new Array[TableAggregateFunction[_ <: Any]](aggregateCalls.size)
- // set the start offset of aggregate buffer value to group keys' length,
- // as all the group keys would be moved to the start fields of intermediate
- // aggregate data.
- var aggOffset = groupKeysCount
-
// create aggregate function instances by function type and aggregate field data type.
aggregateCalls.zipWithIndex.foreach { case (aggregateCall, index) =>
val argList: util.List[Integer] = aggregateCall.getArgList
http://git-wip-us.apache.org/repos/asf/flink/blob/53fb8f3b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala
new file mode 100644
index 0000000..058b4a7
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+class UnboundedProcessingOverProcessFunction(
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Int],
+ private val forwardedFieldCount: Int,
+ private val aggregationStateType: RowTypeInfo)
+ extends ProcessFunction[Row, Row]{
+
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+ private var output: Row = _
+ private var state: ValueState[Row] = _
+
+ override def open(config: Configuration) {
+ output = new Row(forwardedFieldCount + aggregates.length)
+ val stateDescriptor: ValueStateDescriptor[Row] =
+ new ValueStateDescriptor[Row]("overState", aggregationStateType)
+ state = getRuntimeContext.getState(stateDescriptor)
+ }
+
+ override def processElement(
+ input: Row,
+ ctx: ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+
+ var accumulators = state.value()
+
+ if (null == accumulators) {
+ accumulators = new Row(aggregates.length)
+ var i = 0
+ while (i < aggregates.length) {
+ accumulators.setField(i, aggregates(i).createAccumulator())
+ i += 1
+ }
+ }
+
+ var i = 0
+ while (i < forwardedFieldCount) {
+ output.setField(i, input.getField(i))
+ i += 1
+ }
+
+ i = 0
+ while (i < aggregates.length) {
+ val index = forwardedFieldCount + i
+ val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
+ aggregates(i).accumulate(accumulator, input.getField(aggFields(i)))
+ output.setField(index, aggregates(i).getValue(accumulator))
+ i += 1
+ }
+ state.update(accumulators)
+
+ out.collect(output)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fb8f3b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index 70bec72..cf8e442 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -20,17 +20,28 @@ package org.apache.flink.table.api.scala.stream.sql
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{TableEnvironment, TableException}
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData}
+import org.apache.flink.table.api.scala.stream.utils.{StreamingWithStateTestBase, StreamITCase,
+StreamTestData}
import org.apache.flink.types.Row
import org.junit.Assert._
import org.junit._
import scala.collection.mutable
-class SqlITCase extends StreamingMultipleProgramsTestBase {
+class SqlITCase extends StreamingWithStateTestBase {
+
+ val data = List(
+ (1L, 1, "Hello"),
+ (2L, 2, "Hello"),
+ (3L, 3, "Hello"),
+ (4L, 4, "Hello"),
+ (5L, 5, "Hello"),
+ (6L, 6, "Hello"),
+ (7L, 7, "Hello World"),
+ (8L, 8, "Hello World"),
+ (20L, 20, "Hello World"))
/** test selection **/
@Test
@@ -171,4 +182,86 @@ class SqlITCase extends StreamingMultipleProgramsTestBase {
val expected = mutable.MutableList("Hello", "Hello world")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
+
+ @Test
+ def testUnboundPartitionedProcessingWindowWithRange(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ // for sum aggregation ensure that every time the order of each element is consistent
+ env.setParallelism(1)
+
+ val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT " +
+ "c, " +
+ "count(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt1, " +
+ "sum(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt2 " +
+ "from T1"
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "Hello World,1,7", "Hello World,2,15", "Hello World,3,35",
+ "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testUnboundPartitionedProcessingWindowWithRow(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT " +
+ "c, " +
+ "count(a) OVER (PARTITION BY c ORDER BY ProcTime() ROWS BETWEEN UNBOUNDED preceding AND " +
+ "CURRENT ROW)" +
+ "from T1"
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "Hello World,1", "Hello World,2", "Hello World,3",
+ "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ /**
+ * All aggregates must be computed on the same window.
+ */
+ @Test(expected = classOf[TableException])
+ def testMultiWindow(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT " +
+ "c, " +
+ "count(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt1, " +
+ "sum(a) OVER (PARTITION BY b ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt2 " +
+ "from T1"
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fb8f3b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index c1d39aa..e12572f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -129,4 +129,60 @@ class WindowAggregateTest extends TableTestBase {
val expected = ""
streamUtil.verifySql(sql, expected)
}
+
+ @Test
+ def testUnboundPartitionedProcessingWindowWithRange() = {
+ val sql = "SELECT " +
+ "c, " +
+ "count(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt1, " +
+ "sum(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt2 " +
+ "from MyTable"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c", "PROCTIME() AS $2")
+ ),
+ term("partitionBy", "c"),
+ term("orderBy", "PROCTIME"),
+ term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+ term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
+ ),
+ term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
+ @Test
+ def testUnboundPartitionedProcessingWindowWithRow() = {
+ val sql = "SELECT " +
+ "c, " +
+ "count(a) OVER (PARTITION BY c ORDER BY ProcTime() ROWS BETWEEN UNBOUNDED preceding AND " +
+ "CURRENT ROW) as cnt1 " +
+ "from MyTable"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c", "PROCTIME() AS $2")
+ ),
+ term("partitionBy", "c"),
+ term("orderBy", "PROCTIME"),
+ term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+ term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0")
+ ),
+ term("select", "c", "w0$o0 AS $1")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fb8f3b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamingWithStateTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamingWithStateTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamingWithStateTestBase.scala
new file mode 100644
index 0000000..5f3f6ca
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamingWithStateTestBase.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.utils
+
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
+
+import org.junit.rules.TemporaryFolder
+import org.junit.Rule
+
+class StreamingWithStateTestBase extends StreamingMultipleProgramsTestBase {
+
+ val _tempFolder = new TemporaryFolder
+
+ @Rule
+ def tempFolder: TemporaryFolder = _tempFolder
+
+ def getStateBackend: RocksDBStateBackend = {
+ val dbPath = tempFolder.newFolder().getAbsolutePath
+ val checkpointPath = tempFolder.newFolder().toURI.toString
+ val backend = new RocksDBStateBackend(checkpointPath)
+ backend.setDbStoragePath(dbPath)
+ backend
+ }
+}