You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/05/16 05:22:41 UTC
[flink] branch master updated: [FLINK-11945][table-runtime-blink]
Support over aggregation for blink streaming runtime
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e6bbc09 [FLINK-11945][table-runtime-blink] Support over aggregation for blink streaming runtime
e6bbc09 is described below
commit e6bbc09e6999c08ae087559c365f94d2c1f38efa
Author: XuQianJin-Stars <x1...@163.com>
AuthorDate: Tue Apr 23 16:08:48 2019 +0800
[FLINK-11945][table-runtime-blink] Support over aggregation for blink streaming runtime
This closes #8244
---
.../physical/stream/StreamExecOverAggregate.scala | 343 ++++++-
.../flink/table/plan/util/AggregateUtil.scala | 8 +-
.../flink/table/plan/util/OverAggregateUtil.scala | 1 -
.../sql/validation/OverWindowValidationTest.scala | 60 ++
.../stream/sql/agg/OverWindowAggregateTest.scala | 9 +
.../table/runtime/harness/HarnessTestBase.scala | 111 +++
.../runtime/harness/OverWindowHarnessTest.scala | 953 ++++++++++++++++++
.../runtime/stream/sql/OverWindowITCase.scala | 1025 ++++++++++++++++++++
.../KeyedProcessFunctionWithCleanupState.java | 11 +
.../AbstractRowTimeUnboundedPrecedingOver.java | 266 +++++
.../ProcTimeRangeBoundedPrecedingFunction.java | 246 +++++
.../over/ProcTimeRowsBoundedPrecedingFunction.java | 235 +++++
.../over/ProcTimeUnboundedPrecedingFunction.java | 126 +++
.../over/RowTimeRangeBoundedPrecedingFunction.java | 289 ++++++
.../RowTimeRangeUnboundedPrecedingFunction.java | 79 ++
.../over/RowTimeRowsBoundedPrecedingFunction.java | 301 ++++++
.../RowTimeRowsUnboundedPrecedingFunction.java | 71 ++
.../apache/flink/table/type/TypeConverters.java | 17 +-
.../table/runtime/util/BaseRowHarnessAssertor.java | 20 +-
.../table/runtime/util/StreamRecordUtils.java | 45 +
20 files changed, 4189 insertions(+), 27 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecOverAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecOverAggregate.scala
index 93e6831..896d97d 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecOverAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecOverAggregate.scala
@@ -17,25 +17,36 @@
*/
package org.apache.flink.table.plan.nodes.physical.stream
-import org.apache.flink.streaming.api.transformations.StreamTransformation
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator
+import org.apache.flink.streaming.api.transformations.{OneInputTransformation, StreamTransformation}
import org.apache.flink.table.CalcitePair
-import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
+import org.apache.flink.table.`type`.TypeConverters
+import org.apache.flink.table.api.{StreamTableEnvironment, TableConfig, TableException}
import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.CodeGeneratorContext
+import org.apache.flink.table.codegen.agg.AggsHandlerCodeGenerator
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
-import org.apache.flink.table.plan.util.RelExplainUtil
+import org.apache.flink.table.plan.rules.physical.stream.StreamExecRetractionRules
+import org.apache.flink.table.plan.util.AggregateUtil.transformToStreamAggregateInfoList
+import org.apache.flink.table.plan.util.{KeySelectorUtil, OverAggregateUtil, RelExplainUtil}
+import org.apache.flink.table.runtime.over._
+import org.apache.flink.table.typeutils.BaseRowTypeInfo
import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
+import org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.Window.Group
import org.apache.calcite.rel.core.{AggregateCall, Window}
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.calcite.rex.RexLiteral
+import org.apache.calcite.tools.RelBuilder
import java.util
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
/**
* Stream physical RelNode for time-based over [[Window]].
@@ -96,7 +107,7 @@ class StreamExecOverAggregate(
override def explainTerms(pw: RelWriter): RelWriter = {
val overWindow: Group = logicWindow.groups.get(0)
- val constants: Seq[RexLiteral] = logicWindow.constants
+ val constants: Seq[RexLiteral] = logicWindow.constants.asScala
val partitionKeys: Array[Int] = overWindow.keys.toArray
val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates
@@ -123,7 +134,7 @@ class StreamExecOverAggregate(
//~ ExecNode methods -----------------------------------------------------------
override def getInputNodes: util.List[ExecNode[StreamTableEnvironment, _]] = {
- getInputs.map(_.asInstanceOf[ExecNode[StreamTableEnvironment, _]])
+ getInputs.asScala.map(_.asInstanceOf[ExecNode[StreamTableEnvironment, _]]).asJava
}
override def replaceInputNode(
@@ -134,6 +145,324 @@ class StreamExecOverAggregate(
override protected def translateToPlanInternal(
tableEnv: StreamTableEnvironment): StreamTransformation[BaseRow] = {
- throw new TableException("Implements this")
+ val tableConfig = tableEnv.getConfig
+
+ if (logicWindow.groups.size > 1) {
+ throw new TableException(
+ "All aggregates must be computed on the same window.")
+ }
+
+ val overWindow: Group = logicWindow.groups.get(0)
+
+ val orderKeys = overWindow.orderKeys.getFieldCollations
+
+ if (orderKeys.size() != 1) {
+ throw new TableException(
+ "The window can only be ordered by a single time column.")
+ }
+ val orderKey = orderKeys.get(0)
+
+ if (!orderKey.direction.equals(ASCENDING)) {
+ throw new TableException(
+ "The window can only be ordered in ASCENDING mode.")
+ }
+
+ val inputDS = getInputNodes.get(0).translateToPlan(tableEnv)
+ .asInstanceOf[StreamTransformation[BaseRow]]
+
+ val inputIsAccRetract = StreamExecRetractionRules.isAccRetract(input)
+
+ if (inputIsAccRetract) {
+ throw new TableException(
+ "Retraction on Over window aggregation is not supported yet. " +
+ "Note: Over window aggregation should not follow a non-windowed GroupBy aggregation.")
+ }
+
+ if (!logicWindow.groups.get(0).keys.isEmpty && tableConfig.getMinIdleStateRetentionTime < 0) {
+ LOG.warn(
+ "No state retention interval configured for a query which accumulates state. " +
+ "Please provide a query configuration with valid retention interval to prevent " +
+ "excessive state size. You may specify a retention time of 0 to not clean up the state.")
+ }
+
+ val timeType = outputRowType.getFieldList.get(orderKey.getFieldIndex).getType
+
+ // check time field
+ if (!FlinkTypeFactory.isRowtimeIndicatorType(timeType)
+ && !FlinkTypeFactory.isProctimeIndicatorType(timeType)) {
+ throw new TableException(
+ "OVER windows' ordering in stream mode must be defined on a time attribute.")
+ }
+
+ // identify window rowtime attribute
+ val rowTimeIdx: Option[Int] = if (FlinkTypeFactory.isRowtimeIndicatorType(timeType)) {
+ Some(orderKey.getFieldIndex)
+ } else if (FlinkTypeFactory.isProctimeIndicatorType(timeType)) {
+ None
+ } else {
+ throw new TableException(
+ "OVER windows can only be applied on time attributes.")
+ }
+
+ val codeGenCtx = CodeGeneratorContext(tableConfig)
+ val aggregateCalls = logicWindow.groups.get(0).getAggregateCalls(logicWindow).asScala
+ val isRowsClause = overWindow.isRows
+ val constants = logicWindow.constants.asScala
+ val constantTypes = constants.map(c => FlinkTypeFactory.toInternalType(c.getType))
+
+ val fieldNames = inputRowType.getFieldNames.asScala
+ val fieldTypes = inputRowType.getFieldList.asScala
+ .map(c => FlinkTypeFactory.toInternalType(c.getType))
+
+ val inRowType = FlinkTypeFactory.toInternalRowType(inputRel.getRowType)
+ val outRowType = FlinkTypeFactory.toInternalRowType(outputRowType)
+
+ val aggInputType = tableEnv.getTypeFactory.buildRelDataType(
+ fieldNames ++ constants.indices.map(i => "TMP" + i),
+ fieldTypes ++ constantTypes)
+
+ val overProcessFunction = if (overWindow.lowerBound.isPreceding
+ && overWindow.lowerBound.isUnbounded
+ && overWindow.upperBound.isCurrentRow) {
+
+ // unbounded OVER window
+ createUnboundedOverProcessFunction(
+ codeGenCtx,
+ aggregateCalls,
+ constants,
+ aggInputType,
+ rowTimeIdx,
+ isRowsClause,
+ tableConfig,
+ tableEnv.getRelBuilder,
+ tableConfig.getNullCheck)
+
+ } else if (overWindow.lowerBound.isPreceding
+ && !overWindow.lowerBound.isUnbounded
+ && overWindow.upperBound.isCurrentRow) {
+
+ val boundValue = OverAggregateUtil.getBoundary(logicWindow, overWindow.lowerBound)
+
+ if (boundValue.isInstanceOf[BigDecimal]) {
+ throw new TableException(
+ "the specific value is decimal which haven not supported yet.")
+ }
+ // bounded OVER window
+ val precedingOffset = -1 * boundValue.asInstanceOf[Long] + (if (isRowsClause) 1 else 0)
+
+ createBoundedOverProcessFunction(
+ codeGenCtx,
+ aggregateCalls,
+ constants,
+ aggInputType,
+ rowTimeIdx,
+ isRowsClause,
+ precedingOffset,
+ tableConfig,
+ tableEnv.getRelBuilder,
+ tableConfig.getNullCheck)
+
+ } else {
+ throw new TableException(
+ "OVER RANGE FOLLOWING windows are not supported yet.")
+ }
+
+ val partitionKeys: Array[Int] = overWindow.keys.toArray
+ val inputTypeInfo = inRowType.toTypeInfo
+
+ val selector = KeySelectorUtil.getBaseRowSelector(partitionKeys, inputTypeInfo)
+
+ val returnTypeInfo = outRowType.toTypeInfo
+ .asInstanceOf[BaseRowTypeInfo]
+ // partitioned aggregation
+
+ val operator = new KeyedProcessOperator(overProcessFunction)
+
+ val ret = new OneInputTransformation(
+ inputDS,
+ "OverAggregate",
+ operator,
+ returnTypeInfo,
+ inputDS.getParallelism)
+
+ if (partitionKeys.isEmpty) {
+ ret.setParallelism(1)
+ ret.setMaxParallelism(1)
+ }
+
+ // set KeyType and Selector for state
+ ret.setStateKeySelector(selector)
+ ret.setStateKeyType(selector.getProducedType)
+ ret
+ }
+
+ /**
+ * Create an ProcessFunction for unbounded OVER window to evaluate final aggregate value.
+ *
+ * @param ctx code generator context
+ * @param aggregateCalls physical calls to aggregate functions and their output field names
+ * @param constants the constants in aggregates parameters, such as sum(1)
+ * @param aggInputType physical type of the input row which consist of input and constants.
+ * @param rowTimeIdx the index of the rowtime field or None in case of processing time.
+ * @param isRowsClause it is a tag that indicates whether the OVER clause is ROWS clause
+ */
+ private def createUnboundedOverProcessFunction(
+ ctx: CodeGeneratorContext,
+ aggregateCalls: Seq[AggregateCall],
+ constants: Seq[RexLiteral],
+ aggInputType: RelDataType,
+ rowTimeIdx: Option[Int],
+ isRowsClause: Boolean,
+ tableConfig: TableConfig,
+ relBuilder: RelBuilder,
+ nullCheck: Boolean): KeyedProcessFunction[BaseRow, BaseRow, BaseRow] = {
+
+ val needRetraction = false
+ val aggInfoList = transformToStreamAggregateInfoList(
+ aggregateCalls,
+ // use aggInputType which considers constants as input instead of inputSchema.relDataType
+ aggInputType,
+ Array.fill(aggregateCalls.size)(needRetraction),
+ needInputCount = needRetraction,
+ isStateBackendDataViews = true)
+
+ val fieldTypes = inputRowType.getFieldList.asScala.
+ map(c => FlinkTypeFactory.toInternalType(c.getType)).toArray
+
+ val generator = new AggsHandlerCodeGenerator(
+ ctx,
+ relBuilder,
+ fieldTypes,
+ copyInputField = false)
+
+ val genAggsHandler = generator
+ .needAccumulate()
+ // over agg code gen must pass the constants
+ .withConstants(constants)
+ .generateAggsHandler("UnboundedOverAggregateHelper", aggInfoList)
+
+ val flattenAccTypes = aggInfoList.getAccTypes.map(
+ TypeConverters.createInternalTypeFromTypeInfo)
+
+ if (rowTimeIdx.isDefined) {
+ if (isRowsClause) {
+ // ROWS unbounded over process function
+ new RowTimeRowsUnboundedPrecedingFunction(
+ tableConfig.getMinIdleStateRetentionTime,
+ tableConfig.getMaxIdleStateRetentionTime,
+ genAggsHandler,
+ flattenAccTypes,
+ fieldTypes,
+ rowTimeIdx.get)
+ } else {
+ // RANGE unbounded over process function
+ new RowTimeRangeUnboundedPrecedingFunction(
+ tableConfig.getMinIdleStateRetentionTime,
+ tableConfig.getMaxIdleStateRetentionTime,
+ genAggsHandler,
+ flattenAccTypes,
+ fieldTypes,
+ rowTimeIdx.get)
+ }
+ } else {
+ new ProcTimeUnboundedPrecedingFunction(
+ tableConfig.getMinIdleStateRetentionTime,
+ tableConfig.getMaxIdleStateRetentionTime,
+ genAggsHandler,
+ flattenAccTypes)
+ }
+ }
+
+ /**
+ * Create an ProcessFunction for ROWS clause bounded OVER window to evaluate final
+ * aggregate value.
+ *
+ * @param ctx code generator context
+ * @param aggregateCalls physical calls to aggregate functions and their output field names
+ * @param constants the constants in aggregates parameters, such as sum(1)
+ * @param aggInputType physical type of the input row which consist of input and constants.
+ * @param rowTimeIdx the index of the rowtime field or None in case of processing time.
+ * @param isRowsClause it is a tag that indicates whether the OVER clause is ROWS clause
+ */
+ private def createBoundedOverProcessFunction(
+ ctx: CodeGeneratorContext,
+ aggregateCalls: Seq[AggregateCall],
+ constants: Seq[RexLiteral],
+ aggInputType: RelDataType,
+ rowTimeIdx: Option[Int],
+ isRowsClause: Boolean,
+ precedingOffset: Long,
+ tableConfig: TableConfig,
+ relBuilder: RelBuilder,
+ nullCheck: Boolean): KeyedProcessFunction[BaseRow, BaseRow, BaseRow] = {
+
+ val needRetraction = true
+ val aggInfoList = transformToStreamAggregateInfoList(
+ aggregateCalls,
+ // use aggInputType which considers constants as input instead of inputSchema.relDataType
+ aggInputType,
+ Array.fill(aggregateCalls.size)(needRetraction),
+ needInputCount = needRetraction,
+ isStateBackendDataViews = true)
+
+ val fieldTypes = inputRowType.getFieldList.asScala.
+ map(c => FlinkTypeFactory.toInternalType(c.getType)).toArray
+
+ val generator = new AggsHandlerCodeGenerator(
+ ctx,
+ relBuilder,
+ fieldTypes,
+ copyInputField = false)
+
+
+ val genAggsHandler = generator
+ .needRetract()
+ .needAccumulate()
+ // over agg code gen must pass the constants
+ .withConstants(constants)
+ .generateAggsHandler("BoundedOverAggregateHelper", aggInfoList)
+
+ val flattenAccTypes = aggInfoList.getAccTypes.map(
+ TypeConverters.createInternalTypeFromTypeInfo)
+
+ if (rowTimeIdx.isDefined) {
+ if (isRowsClause) {
+ new RowTimeRowsBoundedPrecedingFunction(
+ tableConfig.getMinIdleStateRetentionTime,
+ tableConfig.getMaxIdleStateRetentionTime,
+ genAggsHandler,
+ flattenAccTypes,
+ fieldTypes,
+ precedingOffset,
+ rowTimeIdx.get)
+ } else {
+ new RowTimeRangeBoundedPrecedingFunction(
+ tableConfig.getMinIdleStateRetentionTime,
+ tableConfig.getMaxIdleStateRetentionTime,
+ genAggsHandler,
+ flattenAccTypes,
+ fieldTypes,
+ precedingOffset,
+ rowTimeIdx.get)
+ }
+ } else {
+ if (isRowsClause) {
+ new ProcTimeRowsBoundedPrecedingFunction(
+ tableConfig.getMinIdleStateRetentionTime,
+ tableConfig.getMaxIdleStateRetentionTime,
+ genAggsHandler,
+ flattenAccTypes,
+ fieldTypes,
+ precedingOffset)
+ } else {
+ new ProcTimeRangeBoundedPrecedingFunction(
+ tableConfig.getMinIdleStateRetentionTime,
+ tableConfig.getMaxIdleStateRetentionTime,
+ genAggsHandler,
+ flattenAccTypes,
+ fieldTypes,
+ precedingOffset)
+ }
+ }
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
index 7cc9646..bd5dcec 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
@@ -19,7 +19,7 @@ package org.apache.flink.table.plan.util
import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
import org.apache.flink.table.`type`.InternalTypes._
-import org.apache.flink.table.`type`.{DecimalType, InternalType, InternalTypes, RowType, TypeConverters}
+import org.apache.flink.table.`type`.{DecimalType, InternalType, InternalTypes, TypeConverters}
import org.apache.flink.table.api.{TableConfig, TableConfigOptions, TableException}
import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.table.calcite.{FlinkTypeFactory, FlinkTypeSystem}
@@ -34,8 +34,7 @@ import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
import org.apache.flink.table.functions.{AggregateFunction, UserDefinedFunction}
import org.apache.flink.table.plan.`trait`.RelModifiedMonotonicity
import org.apache.flink.table.runtime.bundle.trigger.CountBundleTrigger
-import org.apache.flink.table.typeutils.{BinaryStringTypeInfo, DecimalTypeInfo, MapViewTypeInfo, TimeIndicatorTypeInfo, TimeIntervalTypeInfo}
-
+import org.apache.flink.table.typeutils.{BaseRowTypeInfo, BinaryStringTypeInfo, DecimalTypeInfo, MapViewTypeInfo, TimeIndicatorTypeInfo, TimeIntervalTypeInfo}
import org.apache.calcite.rel.`type`._
import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
import org.apache.calcite.rex.RexInputRef
@@ -43,7 +42,6 @@ import org.apache.calcite.sql.fun._
import org.apache.calcite.sql.validate.SqlMonotonicity
import org.apache.calcite.sql.{SqlKind, SqlRankFunction}
import org.apache.calcite.tools.RelBuilder
-
import java.util
import scala.collection.JavaConversions._
@@ -490,7 +488,7 @@ object AggregateUtil extends Enumeration {
s"Please re-check the data type.")
}
} else {
- TypeConverters.createExternalTypeInfoFromInternalType(new RowType(argTypes: _*))
+ new BaseRowTypeInfo(argTypes: _*)
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/OverAggregateUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/OverAggregateUtil.scala
index 4017b52..9320f19 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/OverAggregateUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/OverAggregateUtil.scala
@@ -19,7 +19,6 @@
package org.apache.flink.table.plan.util
import org.apache.flink.table.JArrayList
-
import org.apache.calcite.rel.RelFieldCollation.{Direction, NullDirection}
import org.apache.calcite.rel.core.Window
import org.apache.calcite.rel.core.Window.Group
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala
new file mode 100644
index 0000000..71b05ba
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableException, ValidationException}
+import org.apache.flink.table.runtime.utils.JavaUserDefinedScalarFunctions.OverAgg0
+import org.apache.flink.table.util.TableTestBase
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class OverWindowValidationTest extends TableTestBase {
+
+ private val streamUtil = streamTestUtil()
+ streamUtil.addDataStream[(Int, String, Long)]("T1", 'a, 'b, 'c, 'proctime)
+
+ /**
+ * All aggregates must be computed on the same window.
+ */
+ @Test(expected = classOf[TableException])
+ def testMultiWindow(): Unit = {
+
+ val sqlQuery = "SELECT " +
+ "c, " +
+ "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding), " +
+ "sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) " +
+ "from T1"
+
+ streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+ }
+
+ /**
+ * OVER clause is necessary for [[OverAgg0]] window function.
+ */
+ @Test(expected = classOf[ValidationException])
+ def testInvalidOverAggregation(): Unit = {
+ streamUtil.addFunction("overAgg", new OverAgg0)
+
+ val sqlQuery = "SELECT overAgg(c, a) FROM MyTable"
+
+ streamUtil.tableEnv.sqlQuery(sqlQuery)
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/OverWindowAggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/OverWindowAggregateTest.scala
index e9b5a77..0905cbd 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/OverWindowAggregateTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/OverWindowAggregateTest.scala
@@ -417,6 +417,15 @@ class OverWindowAggregateTest extends TableTestBase {
|FROM MyTable
""".stripMargin
+ val sql2 = "SELECT " +
+ "a, " +
+ "SUM(c) OVER w1, " +
+ "MIN(c) OVER w2 " +
+ "FROM MyTable " +
+ "WINDOW w1 AS (PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW)," +
+ "w2 AS (PARTITION BY a ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)"
+
+ verifyPlanIdentical(sql, sql2)
util.verifyPlan(sql)
}
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
new file mode 100644
index 0000000..5b50b7b
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.harness
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.configuration.{CheckpointingOptions, Configuration}
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
+import org.apache.flink.runtime.state.StateBackend
+import org.apache.flink.runtime.state.memory.MemoryStateBackend
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.streaming.api.transformations.{OneInputTransformation, StreamTransformation}
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.runtime.utils.StreamingTestBase
+import org.apache.flink.table.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode}
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConversions._
+
+class HarnessTestBase(mode: StateBackendMode) extends StreamingTestBase {
+
+ private val classLoader = Thread.currentThread.getContextClassLoader
+
+ protected def getStateBackend: StateBackend = {
+ mode match {
+ case HEAP_BACKEND =>
+ val conf = new Configuration()
+ conf.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, true)
+ new MemoryStateBackend().configure(conf, classLoader)
+
+ case ROCKSDB_BACKEND =>
+ new RocksDBStateBackend("file://" + tempFolder.newFolder().getAbsoluteFile)
+ }
+ }
+
+ def createHarnessTester[IN, OUT, KEY](
+ operator: OneInputStreamOperator[IN, OUT],
+ keySelector: KeySelector[IN, KEY],
+ keyType: TypeInformation[KEY]): KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT] = {
+ val harness = new KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT](
+ operator,
+ keySelector,
+ keyType)
+ harness.setStateBackend(getStateBackend)
+ harness
+ }
+
+ def createHarnessTester(
+ ds: DataStream[_],
+ prefixOperatorName: String)
+ : KeyedOneInputStreamOperatorTestHarness[BaseRow, BaseRow, BaseRow] = {
+
+ val transformation = extractExpectedTransformation(
+ ds.javaStream.getTransformation,
+ prefixOperatorName)
+ val processOperator = transformation.getOperator
+ .asInstanceOf[OneInputStreamOperator[Any, Any]]
+ val keySelector = transformation.getStateKeySelector.asInstanceOf[KeySelector[Any, Any]]
+ val keyType = transformation.getStateKeyType.asInstanceOf[TypeInformation[Any]]
+
+ createHarnessTester(processOperator, keySelector, keyType)
+ .asInstanceOf[KeyedOneInputStreamOperatorTestHarness[BaseRow, BaseRow, BaseRow]]
+ }
+
+ private def extractExpectedTransformation(
+ t: StreamTransformation[_],
+ prefixOperatorName: String): OneInputTransformation[_, _] = {
+ t match {
+ case one: OneInputTransformation[_, _] =>
+ if (one.getName.startsWith(prefixOperatorName)) {
+ one
+ } else {
+ extractExpectedTransformation(one.getInput, prefixOperatorName)
+ }
+ case _ => throw new Exception(
+ s"Can not find the expected $prefixOperatorName transformation")
+ }
+ }
+
+ def dropWatermarks(elements: Array[AnyRef]): util.Collection[AnyRef] = {
+ elements.filter(e => !e.isInstanceOf[Watermark]).toList
+ }
+}
+
+object HarnessTestBase {
+
+ @Parameterized.Parameters(name = "StateBackend={0}")
+ def parameters(): util.Collection[Array[java.lang.Object]] = {
+ Seq[Array[AnyRef]](Array(HEAP_BACKEND), Array(ROCKSDB_BACKEND))
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
new file mode 100644
index 0000000..e515138
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
@@ -0,0 +1,953 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Long => JLong}
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.util.StreamRecordUtils.baserow
+import org.apache.flink.table.runtime.util.StreamRecordUtils.binaryrow
+import org.apache.flink.table.runtime.util.BaseRowHarnessAssertor
+import org.apache.flink.table.runtime.utils.StreamingWithStateTestBase.StateBackendMode
+import org.apache.flink.types.Row
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.mutable
+
+@RunWith(classOf[Parameterized])
+class OverWindowHarnessTest(mode: StateBackendMode) extends HarnessTestBase(mode) {
+
+ @Test
+ def testProcTimeBoundedRowsOver(): Unit = {
+
+ val data = new mutable.MutableList[(Long, String, Long)]
+ val t = env.fromCollection(data).toTable(tEnv, 'currtime, 'b, 'c, 'proctime)
+ tEnv.registerTable("T", t)
+
+ val sql =
+ """
+ |SELECT currtime, b, c,
+ | min(c) OVER
+ | (PARTITION BY b ORDER BY proctime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW),
+ | max(c) OVER
+ | (PARTITION BY b ORDER BY proctime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)
+ |FROM T
+ """.stripMargin
+ val t1 = tEnv.sqlQuery(sql)
+
+ tEnv.getConfig.withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3))
+ val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate")
+ val assertor = new BaseRowHarnessAssertor(
+ Array(Types.LONG, Types.STRING, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG))
+
+ testHarness.open()
+
+ // register cleanup timer with 3001
+ testHarness.setProcessingTime(1)
+
+ testHarness.processElement(new StreamRecord(
+ binaryrow(1L: JLong, "aaa", 1L: JLong, null)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(1L: JLong, "bbb", 10L: JLong, null)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(1L: JLong, "aaa", 2L: JLong, null)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(1L: JLong, "aaa", 3L: JLong, null)))
+
+ // register cleanup timer with 4100
+ testHarness.setProcessingTime(1100)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(1L: JLong, "bbb", 20L: JLong, null)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(1L: JLong, "aaa", 4L: JLong, null)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(1L: JLong, "aaa", 5L: JLong, null)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(1L: JLong, "aaa", 6L: JLong, null)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(1L: JLong, "bbb", 30L: JLong, null)))
+
+ // register cleanup timer with 6001
+ testHarness.setProcessingTime(3001)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(2L: JLong, "aaa", 7L: JLong, null)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(2L: JLong, "aaa", 8L: JLong, null)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(2L: JLong, "aaa", 9L: JLong, null)))
+
+ // trigger cleanup timer and register cleanup timer with 9002
+ testHarness.setProcessingTime(6002)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(2L: JLong, "aaa", 10L: JLong, null)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(2L: JLong, "bbb", 40L: JLong, null)))
+
+ val result = testHarness.getOutput
+
+ val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+ expectedOutput.add(new StreamRecord(
+ baserow(1L: JLong, "aaa", 1L: JLong, null, 1L: JLong, 1L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(1L: JLong, "bbb", 10L: JLong, null, 10L: JLong, 10L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(1L: JLong, "aaa", 2L: JLong, null, 1L: JLong, 2L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(1L: JLong, "aaa", 3L: JLong, null, 2L: JLong, 3L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(1L: JLong, "bbb", 20L: JLong, null, 10L: JLong, 20L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(1L: JLong, "aaa", 4L: JLong, null, 3L: JLong, 4L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(1L: JLong, "aaa", 5L: JLong, null, 4L: JLong, 5L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(1L: JLong, "aaa", 6L: JLong, null, 5L: JLong, 6L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(1L: JLong, "bbb", 30L: JLong, null, 20L: JLong, 30L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(2L: JLong, "aaa", 7L: JLong, null, 6L: JLong, 7L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(2L: JLong, "aaa", 8L: JLong, null, 7L: JLong, 8L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(2L: JLong, "aaa", 9L: JLong, null, 8L: JLong, 9L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(2L: JLong, "aaa", 10L: JLong, null, 9L: JLong, 10L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(2L: JLong, "bbb", 40L: JLong, null, 30L: JLong, 40L: JLong)))
+
+ assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result)
+
+ testHarness.close()
+ }
+
+ /**
+ * NOTE: all elements at the same proc timestamp have the same value per key
+ */
+ @Test
+ def testProcTimeBoundedRangeOver(): Unit = {
+
+ val data = new mutable.MutableList[(Long, String, Long)]
+ val t = env.fromCollection(data).toTable(tEnv, 'currtime, 'b, 'c, 'proctime)
+ tEnv.registerTable("T", t)
+
+ val sql =
+ """
+ |SELECT currtime, b, c,
+ | min(c) OVER
+ | (PARTITION BY b ORDER BY proctime
+ | RANGE BETWEEN INTERVAL '4' SECOND PRECEDING AND CURRENT ROW),
+ | max(c) OVER
+ | (PARTITION BY b ORDER BY proctime
+ | RANGE BETWEEN INTERVAL '4' SECOND PRECEDING AND CURRENT ROW)
+ |FROM T
+ """.stripMargin
+ val t1 = tEnv.sqlQuery(sql)
+
+ tEnv.getConfig.withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3))
+ val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate")
+ val assertor = new BaseRowHarnessAssertor(
+ Array(Types.LONG, Types.STRING, Types.LONG, Types.LONG, Types.LONG, Types.LONG))
+ testHarness.open()
+
+ // register cleanup timer with 3003
+ testHarness.setProcessingTime(3)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(0L: JLong, "aaa", 1L: JLong, null)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(0L: JLong, "bbb", 10L: JLong, null)))
+
+ testHarness.setProcessingTime(4)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(0L: JLong, "aaa", 2L: JLong, null)))
+
+ // trigger cleanup timer and register cleanup timer with 6003
+ testHarness.setProcessingTime(3003)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(0L: JLong, "aaa", 3L: JLong, null)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(0L: JLong, "bbb", 20L: JLong, null)))
+
+ testHarness.setProcessingTime(5)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(0L: JLong, "aaa", 4L: JLong, null)))
+
+ // register cleanup timer with 9002
+ testHarness.setProcessingTime(6002)
+
+ testHarness.setProcessingTime(7002)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(0L: JLong, "aaa", 5L: JLong, null)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(0L: JLong, "aaa", 6L: JLong, null)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(0L: JLong, "bbb", 30L: JLong, null)))
+
+ // register cleanup timer with 14002
+ testHarness.setProcessingTime(11002)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(0L: JLong, "aaa", 7L: JLong, null)))
+
+ testHarness.setProcessingTime(11004)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(0L: JLong, "aaa", 8L: JLong, null)))
+
+ testHarness.processElement(new StreamRecord(
+ binaryrow(0L: JLong, "aaa", 9L: JLong, null)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(0L: JLong, "aaa", 10L: JLong, null)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(0L: JLong, "bbb", 40L: JLong, null)))
+
+ testHarness.setProcessingTime(11006)
+
+ val result = testHarness.getOutput
+
+ val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+ // all elements at the same proc timestamp have the same value per key
+ expectedOutput.add(new StreamRecord(
+ baserow(0L: JLong, "aaa", 1L: JLong, null, 1L: JLong, 1L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(0L: JLong, "bbb", 10L: JLong, null, 10L: JLong, 10L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(0L: JLong, "aaa", 2L: JLong, null, 1L: JLong, 2L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(0L: JLong, "aaa", 3L: JLong, null, 1L: JLong, 4L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(0L: JLong, "bbb", 20L: JLong, null, 10L: JLong, 20L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(0L: JLong, "aaa", 4L: JLong, null, 1L: JLong, 4L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(0L: JLong, "aaa", 5L: JLong, null, 3L: JLong, 6L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(0L: JLong, "aaa", 6L: JLong, null, 3L: JLong, 6L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(0L: JLong, "bbb", 30L: JLong, null, 20L: JLong, 30L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(0L: JLong, "aaa", 7L: JLong, null, 5L: JLong, 7L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(0L: JLong, "aaa", 8L: JLong, null, 7L: JLong, 10L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(0L: JLong, "aaa", 9L: JLong, null, 7L: JLong, 10L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(0L: JLong, "aaa", 10L: JLong, null, 7L: JLong, 10L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(0L: JLong, "bbb", 40L: JLong, null, 40L: JLong, 40L: JLong)))
+
+ assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result)
+
+ // test for clean-up timer NPE
+ testHarness.setProcessingTime(20000)
+
+ // timer registered for 23000
+ testHarness.processElement(new StreamRecord(
+ binaryrow(0L: JLong, "ccc", 10L: JLong, null)))
+
+ // update clean-up timer to 25500. Previous timer should not clean up
+ testHarness.setProcessingTime(22500)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(0L: JLong, "ccc", 10L: JLong, null)))
+
+ // 23000 clean-up timer should fire but not fail with an NPE
+ testHarness.setProcessingTime(23001)
+
+ testHarness.close()
+ }
+
+ @Test
+ def testProcTimeUnboundedOver(): Unit = {
+
+ val data = new mutable.MutableList[(Long, String, Long)]
+ val t = env.fromCollection(data).toTable(tEnv, 'currtime, 'b, 'c, 'proctime)
+ tEnv.registerTable("T", t)
+
+ val sql =
+ """
+ |SELECT currtime, b, c,
+ | min(c) OVER
+ | (PARTITION BY b ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW),
+ | max(c) OVER
+ | (PARTITION BY b ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)
+ |FROM T
+ """.stripMargin
+ val t1 = tEnv.sqlQuery(sql)
+
+ tEnv.getConfig.withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3))
+ val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate")
+ val assertor = new BaseRowHarnessAssertor(
+ Array(Types.LONG, Types.STRING, Types.LONG, Types.LONG, Types.LONG, Types.LONG))
+
+ testHarness.open()
+
+ // register cleanup timer with 4003
+ testHarness.setProcessingTime(1003)
+
+ testHarness.processElement(new StreamRecord(
+ binaryrow(0L: JLong, "aaa", 1L: JLong, null)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(0L: JLong, "bbb", 10L: JLong, null)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(0L: JLong, "aaa", 2L: JLong, null)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(0L: JLong, "aaa", 3L: JLong, null)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(0L: JLong, "bbb", 20L: JLong, null)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(0L: JLong, "aaa", 4L: JLong, null)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(0L: JLong, "aaa", 5L: JLong, null)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(0L: JLong, "aaa", 6L: JLong, null)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(0L: JLong, "bbb", 30L: JLong, null)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(0L: JLong, "aaa", 7L: JLong, null)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(0L: JLong, "aaa", 8L: JLong, null)))
+
+ // trigger cleanup timer and register cleanup timer with 8003
+ testHarness.setProcessingTime(5003)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(0L: JLong, "aaa", 9L: JLong, null)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(0L: JLong, "aaa", 10L: JLong, null)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(0L: JLong, "bbb", 40L: JLong, null)))
+
+ val result = testHarness.getOutput
+
+ val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+ expectedOutput.add(new StreamRecord(
+ baserow(0L: JLong, "aaa", 1L: JLong, null, 1L: JLong, 1L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(0L: JLong, "bbb", 10L: JLong, null, 10L: JLong, 10L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(0L: JLong, "aaa", 2L: JLong, null, 1L: JLong, 2L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(0L: JLong, "aaa", 3L: JLong, null, 1L: JLong, 3L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(0L: JLong, "bbb", 20L: JLong, null, 10L: JLong, 20L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(0L: JLong, "aaa", 4L: JLong, null, 1L: JLong, 4L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(0L: JLong, "aaa", 5L: JLong, null, 1L: JLong, 5L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(0L: JLong, "aaa", 6L: JLong, null, 1L: JLong, 6L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(0L: JLong, "bbb", 30L: JLong, null, 10L: JLong, 30L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(0L: JLong, "aaa", 7L: JLong, null, 1L: JLong, 7L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(0L: JLong, "aaa", 8L: JLong, null, 1L: JLong, 8L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(0L: JLong, "aaa", 9L: JLong, null, 1L: JLong, 9L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(0L: JLong, "aaa", 10L: JLong, null, 1L: JLong, 10L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(0L: JLong, "bbb", 40L: JLong, null, 10L: JLong, 40L: JLong)))
+
+ assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result)
+ testHarness.close()
+ }
+
+ /**
+ * all elements at the same row-time have the same value per key
+ */
+ @Test
+ def testRowTimeBoundedRangeOver(): Unit = {
+
+ val data = new mutable.MutableList[(Long, String, Long)]
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val t = env.fromCollection(data).toTable(tEnv, 'rowtime, 'b, 'c)
+ tEnv.registerTable("T", t)
+
+ val sql =
+ """
+ |SELECT rowtime, b, c,
+ | min(c) OVER
+ | (PARTITION BY b ORDER BY rowtime
+ | RANGE BETWEEN INTERVAL '4' SECOND PRECEDING AND CURRENT ROW),
+ | max(c) OVER
+ | (PARTITION BY b ORDER BY rowtime
+ | RANGE BETWEEN INTERVAL '4' SECOND PRECEDING AND CURRENT ROW)
+ |FROM T
+ """.stripMargin
+ val t1 = tEnv.sqlQuery(sql)
+
+ tEnv.getConfig.withIdleStateRetentionTime(Time.seconds(1), Time.seconds(2))
+ val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate")
+ val assertor = new BaseRowHarnessAssertor(
+ Array(Types.LONG, Types.STRING, Types.LONG, Types.LONG, Types.LONG))
+
+ testHarness.open()
+
+ testHarness.processWatermark(1)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(2L: JLong, "aaa", 1L: JLong)))
+
+ testHarness.processWatermark(2)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(3L: JLong, "bbb", 10L: JLong)))
+
+ testHarness.processWatermark(4000)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(4001L: JLong, "aaa", 2L: JLong)))
+
+ testHarness.processWatermark(4001)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(4002L: JLong, "aaa", 3L: JLong)))
+
+ testHarness.processWatermark(4002)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(4003L: JLong, "aaa", 4L: JLong)))
+
+ testHarness.processWatermark(4800)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(4801L: JLong, "bbb", 25L: JLong)))
+
+ testHarness.processWatermark(6500)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(6501L: JLong, "aaa", 5L: JLong)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(6501L: JLong, "aaa", 6L: JLong)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(6501L: JLong, "bbb", 30L: JLong)))
+
+ testHarness.processWatermark(7000)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(7001L: JLong, "aaa", 7L: JLong)))
+
+ testHarness.processWatermark(8000)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(8001L: JLong, "aaa", 8L: JLong)))
+
+ testHarness.processWatermark(12000)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(12001L: JLong, "aaa", 9L: JLong)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(12001L: JLong, "aaa", 10L: JLong)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(12001L: JLong, "bbb", 40L: JLong)))
+
+ testHarness.processWatermark(19000)
+
+ // test cleanup
+ testHarness.setProcessingTime(1000)
+ testHarness.processWatermark(20000)
+
+ // check that state is removed after max retention time
+ testHarness.processElement(new StreamRecord(
+ binaryrow(20001L: JLong, "ccc", 1L: JLong))) // clean-up 3000
+ testHarness.setProcessingTime(2500)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(20002L: JLong, "ccc", 2L: JLong))) // clean-up 4500
+ testHarness.processWatermark(20010) // compute output
+
+ testHarness.setProcessingTime(4499)
+ testHarness.setProcessingTime(4500)
+
+ // check that state is only removed if all data was processed
+ testHarness.processElement(new StreamRecord(
+ binaryrow(20011L: JLong, "ccc", 3L: JLong))) // clean-up 6500
+
+ testHarness.setProcessingTime(6500) // clean-up attempt but rescheduled to 8500
+
+ testHarness.processWatermark(20020) // schedule emission
+
+ testHarness.setProcessingTime(8499) // clean-up
+ testHarness.setProcessingTime(8500) // clean-up
+
+ val result = dropWatermarks(testHarness.getOutput.toArray)
+
+ val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+ // all elements at the same row-time have the same value per key
+ expectedOutput.add(new StreamRecord(
+ baserow(2L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(3L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(4002L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(4003L: JLong, "aaa", 4L: JLong, 2L: JLong, 4L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(4801L: JLong, "bbb", 25L: JLong, 25L: JLong, 25L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(6501L: JLong, "aaa", 5L: JLong, 2L: JLong, 6L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(6501L: JLong, "aaa", 6L: JLong, 2L: JLong, 6L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(7001L: JLong, "aaa", 7L: JLong, 2L: JLong, 7L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(8001L: JLong, "aaa", 8L: JLong, 2L: JLong, 8L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(6501L: JLong, "bbb", 30L: JLong, 25L: JLong, 30L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(12001L: JLong, "aaa", 9L: JLong, 8L: JLong, 10L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(12001L: JLong, "aaa", 10L: JLong, 8L: JLong, 10L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(12001L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong)))
+
+ expectedOutput.add(new StreamRecord(
+ baserow(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(20011L: JLong, "ccc", 3L: JLong, 1L: JLong, 3L: JLong)))
+
+ assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result)
+ testHarness.close()
+ }
+
+ @Test
+ def testRowTimeBoundedRowsOver(): Unit = {
+
+ val data = new mutable.MutableList[(Long, String, Long)]
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val t = env.fromCollection(data).toTable(tEnv, 'rowtime, 'b, 'c)
+ tEnv.registerTable("T", t)
+
+ val sql =
+ """
+ |SELECT rowtime, b, c,
+ | min(c) OVER
+ | (PARTITION BY b ORDER BY rowtime
+ | ROWS BETWEEN 2 PRECEDING AND CURRENT ROW),
+ | max(c) OVER
+ | (PARTITION BY b ORDER BY rowtime
+ | ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
+ |FROM T
+ """.stripMargin
+ val t1 = tEnv.sqlQuery(sql)
+
+ tEnv.getConfig.withIdleStateRetentionTime(Time.seconds(1), Time.seconds(2))
+ val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate")
+ val assertor = new BaseRowHarnessAssertor(
+ Array(Types.LONG, Types.STRING, Types.LONG, Types.LONG, Types.LONG))
+
+ testHarness.open()
+
+ testHarness.processWatermark(800)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(801L: JLong, "aaa", 1L: JLong)))
+
+ testHarness.processWatermark(2500)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(2501L: JLong, "bbb", 10L: JLong)))
+
+ testHarness.processWatermark(4000)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(4001L: JLong, "aaa", 2L: JLong)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(4001L: JLong, "aaa", 3L: JLong)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(4001L: JLong, "bbb", 20L: JLong)))
+
+ testHarness.processWatermark(4800)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(4801L: JLong, "aaa", 4L: JLong)))
+
+ testHarness.processWatermark(6500)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(6501L: JLong, "aaa", 5L: JLong)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(6501L: JLong, "aaa", 6L: JLong)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(6501L: JLong, "bbb", 30L: JLong)))
+
+ testHarness.processWatermark(7000)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(7001L: JLong, "aaa", 7L: JLong)))
+
+ testHarness.processWatermark(8000)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(8001L: JLong, "aaa", 8L: JLong)))
+
+ testHarness.processWatermark(12000)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(12001L: JLong, "aaa", 9L: JLong)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(12001L: JLong, "aaa", 10L: JLong)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(12001L: JLong, "bbb", 40L: JLong)))
+
+ testHarness.processWatermark(19000)
+
+ // test cleanup
+ testHarness.setProcessingTime(1000)
+ testHarness.processWatermark(20000)
+
+ // check that state is removed after max retention time
+ testHarness.processElement(new StreamRecord(
+ binaryrow(20001L: JLong, "ccc", 1L: JLong))) // clean-up 3000
+ testHarness.setProcessingTime(2500)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(20002L: JLong, "ccc", 2L: JLong))) // clean-up 4500
+ testHarness.processWatermark(20010) // compute output
+
+ testHarness.setProcessingTime(4499)
+ testHarness.setProcessingTime(4500)
+
+ // check that state is only removed if all data was processed
+ testHarness.processElement(new StreamRecord(
+ binaryrow(20011L: JLong, "ccc", 3L: JLong))) // clean-up 6500
+
+ testHarness.setProcessingTime(6500) // clean-up attempt but rescheduled to 8500
+
+ testHarness.processWatermark(20020) // schedule emission
+
+ testHarness.setProcessingTime(8499) // clean-up
+ testHarness.setProcessingTime(8500) // clean-up
+
+
+ val result = dropWatermarks(testHarness.getOutput.toArray)
+
+ val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+ expectedOutput.add(new StreamRecord(
+ baserow(801L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(2501L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(4001L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(4001L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(4801L: JLong, "aaa", 4L: JLong, 2L: JLong, 4L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(6501L: JLong, "aaa", 5L: JLong, 3L: JLong, 5L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(6501L: JLong, "aaa", 6L: JLong, 4L: JLong, 6L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(6501L: JLong, "bbb", 30L: JLong, 10L: JLong, 30L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(7001L: JLong, "aaa", 7L: JLong, 5L: JLong, 7L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(8001L: JLong, "aaa", 8L: JLong, 6L: JLong, 8L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(12001L: JLong, "aaa", 9L: JLong, 7L: JLong, 9L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(12001L: JLong, "aaa", 10L: JLong, 8L: JLong, 10L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(12001L: JLong, "bbb", 40L: JLong, 20L: JLong, 40L: JLong)))
+
+ expectedOutput.add(new StreamRecord(
+ baserow(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(20011L: JLong, "ccc", 3L: JLong, 1L: JLong, 3L: JLong)))
+
+ assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result)
+ testHarness.close()
+ }
+
+ /**
+ * all elements at the same row-time have the same value per key
+ */
+ @Test
+ def testRowTimeUnboundedRangeOver(): Unit = {
+
+ val data = new mutable.MutableList[(Long, String, Long)]
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val t = env.fromCollection(data).toTable(tEnv, 'rowtime, 'b, 'c)
+ tEnv.registerTable("T", t)
+
+ val sql =
+ """
+ |SELECT rowtime, b, c,
+ | min(c) OVER
+ | (PARTITION BY b ORDER BY rowtime
+ | RANGE BETWEEN UNBOUNDED preceding AND CURRENT ROW),
+ | max(c) OVER
+ | (PARTITION BY b ORDER BY rowtime
+ | RANGE BETWEEN UNBOUNDED preceding AND CURRENT ROW)
+ |FROM T
+ """.stripMargin
+ val t1 = tEnv.sqlQuery(sql)
+
+ tEnv.getConfig.withIdleStateRetentionTime(Time.seconds(1), Time.seconds(2))
+ val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate")
+ val assertor = new BaseRowHarnessAssertor(
+ Array(Types.LONG, Types.STRING, Types.LONG, Types.LONG, Types.LONG))
+
+ testHarness.open()
+
+ testHarness.setProcessingTime(1000)
+ testHarness.processWatermark(800)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(801L: JLong, "aaa", 1L: JLong)))
+
+ testHarness.processWatermark(2500)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(2501L: JLong, "bbb", 10L: JLong)))
+
+ testHarness.processWatermark(4000)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(4001L: JLong, "aaa", 2L: JLong)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(4001L: JLong, "aaa", 3L: JLong)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(4001L: JLong, "bbb", 20L: JLong)))
+
+ testHarness.processWatermark(4800)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(4801L: JLong, "aaa", 4L: JLong)))
+
+ testHarness.processWatermark(6500)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(6501L: JLong, "aaa", 5L: JLong)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(6501L: JLong, "aaa", 6L: JLong)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(6501L: JLong, "bbb", 30L: JLong)))
+
+ testHarness.processWatermark(7000)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(7001L: JLong, "aaa", 7L: JLong)))
+
+ testHarness.processWatermark(8000)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(8001L: JLong, "aaa", 8L: JLong)))
+
+ testHarness.processWatermark(12000)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(12001L: JLong, "aaa", 9L: JLong)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(12001L: JLong, "aaa", 10L: JLong)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(12001L: JLong, "bbb", 40L: JLong)))
+
+ testHarness.processWatermark(19000)
+
+ // test cleanup
+ testHarness.setProcessingTime(2999) // clean up timer is 3000, so nothing should happen
+ testHarness.setProcessingTime(3000) // clean up is triggered
+
+ testHarness.processWatermark(20000)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(20000L: JLong, "ccc", 1L: JLong))) // test for late data
+
+ testHarness.processElement(new StreamRecord(
+ binaryrow(20001L: JLong, "ccc", 1L: JLong))) // clean-up 5000
+ testHarness.setProcessingTime(2500)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(20002L: JLong, "ccc", 2L: JLong))) // clean-up 5000
+
+ testHarness.setProcessingTime(5000) // does not clean up, because data left. New timer 7000
+ testHarness.processWatermark(20010) // compute output
+
+ testHarness.setProcessingTime(6999) // clean up timer is 3000, so nothing should happen
+ testHarness.setProcessingTime(7000) // clean up is triggered
+
+ val result = dropWatermarks(testHarness.getOutput.toArray)
+
+ val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+ // all elements at the same row-time have the same value per key
+ expectedOutput.add(new StreamRecord(
+ baserow(801L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(2501L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 3L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(4001L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(4001L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(4801L: JLong, "aaa", 4L: JLong, 1L: JLong, 4L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(6501L: JLong, "aaa", 5L: JLong, 1L: JLong, 6L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(6501L: JLong, "aaa", 6L: JLong, 1L: JLong, 6L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(6501L: JLong, "bbb", 30L: JLong, 10L: JLong, 30L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(7001L: JLong, "aaa", 7L: JLong, 1L: JLong, 7L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(8001L: JLong, "aaa", 8L: JLong, 1L: JLong, 8L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(12001L: JLong, "aaa", 9L: JLong, 1L: JLong, 10L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(12001L: JLong, "aaa", 10L: JLong, 1L: JLong, 10L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(12001L: JLong, "bbb", 40L: JLong, 10L: JLong, 40L: JLong)))
+
+ expectedOutput.add(new StreamRecord(
+ baserow(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong)))
+
+ assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result)
+ testHarness.close()
+ }
+
+ @Test
+ def testRowTimeUnboundedRowsOver(): Unit = {
+
+ val data = new mutable.MutableList[(Long, String, Long)]
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val t = env.fromCollection(data).toTable(tEnv, 'rowtime, 'b, 'c)
+ tEnv.registerTable("T", t)
+
+ val sql =
+ """
+ |SELECT rowtime, b, c,
+ | min(c) OVER
+ | (PARTITION BY b ORDER BY rowtime
+ | ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW),
+ | max(c) OVER
+ | (PARTITION BY b ORDER BY rowtime
+ | ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)
+ |FROM T
+ """.stripMargin
+ val t1 = tEnv.sqlQuery(sql)
+
+ tEnv.getConfig.withIdleStateRetentionTime(Time.seconds(1), Time.seconds(2))
+ val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate")
+ val assertor = new BaseRowHarnessAssertor(
+ Array(Types.LONG, Types.STRING, Types.LONG, Types.LONG, Types.LONG))
+
+ testHarness.open()
+
+ testHarness.setProcessingTime(1000)
+ testHarness.processWatermark(800)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(801L: JLong, "aaa", 1L: JLong)))
+
+ testHarness.processWatermark(2500)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(2501L: JLong, "bbb", 10L: JLong)))
+
+ testHarness.processWatermark(4000)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(4001L: JLong, "aaa", 2L: JLong)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(4001L: JLong, "aaa", 3L: JLong)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(4001L: JLong, "bbb", 20L: JLong)))
+
+ testHarness.processWatermark(4800)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(4801L: JLong, "aaa", 4L: JLong)))
+
+ testHarness.processWatermark(6500)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(6501L: JLong, "aaa", 5L: JLong)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(6501L: JLong, "aaa", 6L: JLong)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(6501L: JLong, "bbb", 30L: JLong)))
+
+ testHarness.processWatermark(7000)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(7001L: JLong, "aaa", 7L: JLong)))
+
+ testHarness.processWatermark(8000)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(8001L: JLong, "aaa", 8L: JLong)))
+
+ testHarness.processWatermark(12000)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(12001L: JLong, "aaa", 9L: JLong)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(12001L: JLong, "aaa", 10L: JLong)))
+ testHarness.processElement(new StreamRecord(
+ binaryrow(12001L: JLong, "bbb", 40L: JLong)))
+
+ testHarness.processWatermark(19000)
+
+ // test cleanup
+ testHarness.setProcessingTime(2999) // clean up timer is 3000, so nothing should happen
+ testHarness.setProcessingTime(3000) // clean up is triggered
+
+ testHarness.processWatermark(20000)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(20000L: JLong, "ccc", 2L: JLong))) // test for late data
+
+ testHarness.processElement(new StreamRecord(
+ binaryrow(20001L: JLong, "ccc", 1L: JLong))) // clean-up 5000
+ testHarness.setProcessingTime(2500)
+ testHarness.processElement(new StreamRecord(
+ binaryrow(20002L: JLong, "ccc", 2L: JLong))) // clean-up 5000
+
+ testHarness.setProcessingTime(5000) // does not clean up, because data left. New timer 7000
+ testHarness.processWatermark(20010) // compute output
+
+ testHarness.setProcessingTime(6999) // clean up timer is 3000, so nothing should happen
+ testHarness.setProcessingTime(7000) // clean up is triggered
+
+ val result = dropWatermarks(testHarness.getOutput.toArray)
+
+ val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+ expectedOutput.add(new StreamRecord(
+ baserow(801L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(2501L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(4001L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(4001L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(4801L: JLong, "aaa", 4L: JLong, 1L: JLong, 4L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(6501L: JLong, "aaa", 5L: JLong, 1L: JLong, 5L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(6501L: JLong, "aaa", 6L: JLong, 1L: JLong, 6L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(6501L: JLong, "bbb", 30L: JLong, 10L: JLong, 30L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(7001L: JLong, "aaa", 7L: JLong, 1L: JLong, 7L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(8001L: JLong, "aaa", 8L: JLong, 1L: JLong, 8L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(12001L: JLong, "aaa", 9L: JLong, 1L: JLong, 9L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(12001L: JLong, "aaa", 10L: JLong, 1L: JLong, 10L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(12001L: JLong, "bbb", 40L: JLong, 10L: JLong, 40L: JLong)))
+
+ expectedOutput.add(new StreamRecord(
+ baserow(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong)))
+ expectedOutput.add(new StreamRecord(
+ baserow(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong)))
+
+ assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result)
+ testHarness.close()
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
new file mode 100644
index 0000000..72ca720
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
@@ -0,0 +1,1025 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeProcessOperator
+import org.apache.flink.table.runtime.utils.StreamingWithStateTestBase.StateBackendMode
+import org.apache.flink.table.runtime.utils.UserDefinedFunctionTestUtils.{CountNullNonNull, CountPairs, LargerThanCount}
+import org.apache.flink.table.runtime.utils.{StreamTestData, StreamingWithStateTestBase, TestingAppendSink}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.mutable
+
+@RunWith(classOf[Parameterized])
+class OverWindowITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode) {
+
+ val data = List(
+ (1L, 1, "Hello"),
+ (2L, 2, "Hello"),
+ (3L, 3, "Hello"),
+ (4L, 4, "Hello"),
+ (5L, 5, "Hello"),
+ (6L, 6, "Hello"),
+ (7L, 7, "Hello World"),
+ (8L, 8, "Hello World"),
+ (20L, 20, "Hello World"))
+
+ @Test
+ def testProcTimeBoundedPartitionedRowsOver(): Unit = {
+ val t = failingDataSource(StreamTestData.get5TupleData)
+ .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime)
+ tEnv.registerTable("MyTable", t)
+
+ val sqlQuery = "SELECT a, " +
+ " SUM(c) OVER (" +
+ " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW), " +
+ " MIN(c) OVER (" +
+ " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) " +
+ "FROM MyTable"
+
+ val sink = new TestingAppendSink
+ tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = List(
+ "1,0,0",
+ "2,1,1",
+ "2,3,1",
+ "3,3,3",
+ "3,7,3",
+ "3,12,3",
+ "4,6,6",
+ "4,13,6",
+ "4,21,6",
+ "4,30,6",
+ "5,10,10",
+ "5,21,10",
+ "5,33,10",
+ "5,46,10",
+ "5,60,10")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testProcTimeBoundedPartitionedRowsOverWithBultinProctime(): Unit = {
+ val t = failingDataSource(StreamTestData.get5TupleData)
+ .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime)
+ tEnv.registerTable("MyTable", t)
+
+ val sqlQuery = "SELECT a, " +
+ " SUM(c) OVER (" +
+ " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW), " +
+ " MIN(c) OVER (" +
+ " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) " +
+ "FROM MyTable"
+
+ val sink = new TestingAppendSink
+ tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = List(
+ "1,0,0",
+ "2,1,1",
+ "2,3,1",
+ "3,3,3",
+ "3,7,3",
+ "3,12,3",
+ "4,6,6",
+ "4,13,6",
+ "4,21,6",
+ "4,30,6",
+ "5,10,10",
+ "5,21,10",
+ "5,33,10",
+ "5,46,10",
+ "5,60,10")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime(): Unit = {
+ val t = failingDataSource(StreamTestData.get5TupleData)
+ .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime)
+ tEnv.registerTable("MyTable", t)
+
+ val sqlQuery = "SELECT a, " +
+ " SUM(c) OVER (" +
+ " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW), " +
+ " MIN(c) OVER (" +
+ " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) " +
+ "FROM MyTable"
+
+ val sink = new TestingAppendSink
+ tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = List(
+ "1,0,0",
+ "2,1,1",
+ "2,3,1",
+ "3,3,3",
+ "3,7,3",
+ "3,12,3",
+ "4,6,6",
+ "4,13,6",
+ "4,21,6",
+ "4,30,6",
+ "5,10,10",
+ "5,21,10",
+ "5,33,10",
+ "5,46,10",
+ "5,60,10")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testProcTimeBoundedNonPartitionedRowsOver(): Unit = {
+ val t = failingDataSource(StreamTestData.get5TupleData)
+ .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime)
+ tEnv.registerTable("MyTable", t)
+
+ val sqlQuery = "SELECT a, " +
+ " first_value(d) OVER (" +
+ " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW), " +
+ " last_value(d) OVER (" +
+ " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW), " +
+ " SUM(c) OVER (" +
+ " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW), " +
+ " MIN(c) OVER (" +
+ " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) " +
+ "FROM MyTable"
+
+ val sink = new TestingAppendSink
+ tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = List(
+ "1,Hallo,Hallo,0,0",
+ "2,Hallo,Hallo Welt,1,0",
+ "2,Hallo,Hallo Welt wie,3,0",
+ "3,Hallo,Hallo Welt wie gehts?,6,0",
+ "3,Hallo,ABC,10,0",
+ "3,Hallo,BCD,15,0",
+ "4,Hallo,CDE,21,0",
+ "4,Hallo,DEF,28,0",
+ "4,Hallo,EFG,36,0",
+ "4,Hallo,FGH,45,0",
+ "5,Hallo,GHI,55,0",
+ "5,Hallo Welt,HIJ,66,1",
+ "5,Hallo Welt wie,IJK,77,2",
+ "5,Hallo Welt wie gehts?,JKL,88,3",
+ "5,ABC,KLM,99,4")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testProcTimeUnboundedPartitionedRangeOver(): Unit = {
+ val t1 = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c, 'proctime)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT " +
+ "c, " +
+ "first_value(b) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding)," +
+ "last_value(b) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding)," +
+ "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding), " +
+ "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) " +
+ "from T1"
+
+ val sink = new TestingAppendSink
+ tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = List(
+ "Hello World,7,7,1,7", "Hello World,7,8,2,15", "Hello World,7,20,3,35",
+ "Hello,1,1,1,1", "Hello,1,2,2,3", "Hello,1,3,3,6", "Hello,1,4,4,10", "Hello,1,5,5,15",
+ "Hello,1,6,6,21")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testProcTimeUnboundedPartitionedRowsOver(): Unit = {
+ val t1 = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c, 'proctime)
+
+ tEnv.registerTable("T1", t1)
+
+ val sql =
+ """
+ |SELECT c, sum1, maxnull
+ |FROM (
+ | SELECT c,
+ | max(cast(null as varchar)) OVER
+ | (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)
+ | as maxnull,
+ | sum(1) OVER
+ | (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)
+ | as sum1
+ | FROM T1
+ |)
+ """.stripMargin
+
+ val sink = new TestingAppendSink
+ tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = List(
+ "Hello World,1,null", "Hello World,2,null", "Hello World,3,null",
+ "Hello,1,null", "Hello,2,null", "Hello,3,null", "Hello,4,null",
+ "Hello,5,null", "Hello,6,null")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+
+ }
+
+ @Test
+ def testProcTimeUnboundedNonPartitionedRangeOver(): Unit = {
+ tEnv.getConfig.withIdleStateRetentionTime(Time.hours(2), Time.hours(3))
+
+ val t1 = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c, 'proctime)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT " +
+ "c, " +
+ "count(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding), " +
+ "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) " +
+ "from T1"
+
+ val sink = new TestingAppendSink
+ tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = List(
+ "Hello World,7,28", "Hello World,8,36", "Hello World,9,56",
+ "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testProcTimeUnboundedNonPartitionedRowsOver(): Unit = {
+ val t1 = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c, 'proctime)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT " +
+ "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
+ "from T1"
+
+ val sink = new TestingAppendSink
+ tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = List("1", "2", "3", "4", "5", "6", "7", "8", "9")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testRowTimeBoundedPartitionedRangeOver(): Unit = {
+ val data: Seq[Either[(Long, (Long, Int, String)), Long]] = Seq(
+ Left((1500L, (1L, 15, "Hello"))),
+ Left((1600L, (1L, 16, "Hello"))),
+ Left((1000L, (1L, 1, "Hello"))),
+ Left((2000L, (2L, 2, "Hello"))),
+ Right(1000L),
+ Left((2000L, (2L, 2, "Hello"))),
+ Left((2000L, (2L, 3, "Hello"))),
+ Left((3000L, (3L, 3, "Hello"))),
+ Right(2000L),
+ Left((4000L, (4L, 4, "Hello"))),
+ Right(3000L),
+ Left((5000L, (5L, 5, "Hello"))),
+ Right(5000L),
+ Left((6000L, (6L, 6, "Hello"))),
+ Left((6500L, (6L, 65, "Hello"))),
+ Right(7000L),
+ Left((9000L, (6L, 9, "Hello"))),
+ Left((9500L, (6L, 18, "Hello"))),
+ Left((9000L, (6L, 9, "Hello"))),
+ Right(10000L),
+ Left((10000L, (7L, 7, "Hello World"))),
+ Left((11000L, (7L, 17, "Hello World"))),
+ Left((11000L, (7L, 77, "Hello World"))),
+ Right(12000L),
+ Left((14000L, (7L, 18, "Hello World"))),
+ Right(14000L),
+ Left((15000L, (8L, 8, "Hello World"))),
+ Right(17000L),
+ Left((20000L, (20L, 20, "Hello World"))),
+ Right(19000L))
+
+ val source = failingDataSource(data)
+ val t1 = source.transform("TimeAssigner", new EventTimeProcessOperator[(Long, Int, String)])
+ .setParallelism(source.parallelism)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime)
+
+ tEnv.registerTable("T1", t1)
+ tEnv.registerFunction("LTCNT", new LargerThanCount)
+
+ val sqlQuery = "SELECT " +
+ " c, b, " +
+ " LTCNT(a, CAST('4' AS BIGINT)) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
+ " BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
+ " first_value(a, a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
+ " BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
+ " last_value(a, a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
+ " BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
+ " COUNT(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
+ " BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
+ " SUM(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
+ " BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW)" +
+ " FROM T1"
+
+ val sink = new TestingAppendSink
+ tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = List(
+ "Hello,1,0,1,1,1,1", "Hello,15,0,1,1,2,2", "Hello,16,0,1,1,3,3",
+ "Hello,2,0,1,2,6,9", "Hello,3,0,1,2,6,9", "Hello,2,0,1,2,6,9",
+ "Hello,3,0,2,3,4,9",
+ "Hello,4,0,3,4,2,7",
+ "Hello,5,1,4,5,2,9",
+ "Hello,6,2,5,6,2,11", "Hello,65,2,6,6,2,12",
+ "Hello,9,2,6,6,2,12", "Hello,9,2,6,6,2,12", "Hello,18,3,6,6,3,18",
+ "Hello World,17,3,7,7,3,21",
+ "Hello World,7,1,7,7,1,7",
+ "Hello World,77,3,7,7,3,21",
+ "Hello World,18,1,7,7,1,7",
+ "Hello World,8,2,7,8,2,15",
+ "Hello World,20,1,20,20,1,20")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testRowTimeBoundedPartitionedRowsOver(): Unit = {
+ val data: Seq[Either[(Long, (Long, Int, String)), Long]] = Seq(
+ Left((1L, (1L, 1, "Hello"))),
+ Left((2L, (2L, 2, "Hello"))),
+ Left((1L, (1L, 1, "Hello"))),
+ Left((2L, (2L, 2, "Hello"))),
+ Left((2L, (2L, 2, "Hello"))),
+ Left((1L, (1L, 1, "Hello"))),
+ Left((3L, (7L, 7, "Hello World"))),
+ Left((1L, (7L, 7, "Hello World"))),
+ Left((1L, (7L, 7, "Hello World"))),
+ Right(2L),
+ Left((3L, (3L, 3, "Hello"))),
+ Left((4L, (4L, 4, "Hello"))),
+ Left((5L, (5L, 5, "Hello"))),
+ Left((6L, (6L, 6, "Hello"))),
+ Left((20L, (20L, 20, "Hello World"))),
+ Right(6L),
+ Left((8L, (8L, 8, "Hello World"))),
+ Left((7L, (7L, 7, "Hello World"))),
+ Right(20L))
+
+ val source = failingDataSource(data)
+ val t1 = source.transform("TimeAssigner", new EventTimeProcessOperator[(Long, Int, String)])
+ .setParallelism(source.parallelism)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime)
+
+ tEnv.registerTable("T1", t1)
+ tEnv.registerFunction("LTCNT", new LargerThanCount)
+
+ val sqlQuery = "SELECT " +
+ " c, a, " +
+ " LTCNT(a, CAST('4' AS BIGINT)) " +
+ " OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW), " +
+ " COUNT(1) " +
+ " OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW), " +
+ " SUM(a) " +
+ " OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) " +
+ "FROM T1"
+
+ val sink = new TestingAppendSink
+ tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = List(
+ "Hello,1,0,1,1", "Hello,1,0,2,2", "Hello,1,0,3,3",
+ "Hello,2,0,3,4", "Hello,2,0,3,5", "Hello,2,0,3,6",
+ "Hello,3,0,3,7", "Hello,4,0,3,9", "Hello,5,1,3,12",
+ "Hello,6,2,3,15",
+ "Hello World,7,1,1,7", "Hello World,7,2,2,14", "Hello World,7,3,3,21",
+ "Hello World,7,3,3,21", "Hello World,8,3,3,22", "Hello World,20,3,3,35")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testRowTimeBoundedNonPartitionedRangeOver(): Unit = {
+ val data: Seq[Either[(Long, (Long, Int, String)), Long]] = Seq(
+ Left((1500L, (1L, 15, "Hello"))),
+ Left((1600L, (1L, 16, "Hello"))),
+ Left((1000L, (1L, 1, "Hello"))),
+ Left((2000L, (2L, 2, "Hello"))),
+ Right(1000L),
+ Left((2000L, (2L, 2, "Hello"))),
+ Left((2000L, (2L, 3, "Hello"))),
+ Left((3000L, (3L, 3, "Hello"))),
+ Right(2000L),
+ Left((4000L, (4L, 4, "Hello"))),
+ Right(3000L),
+ Left((5000L, (5L, 5, "Hello"))),
+ Right(5000L),
+ Left((6000L, (6L, 6, "Hello"))),
+ Left((6500L, (6L, 65, "Hello"))),
+ Right(7000L),
+ Left((9000L, (6L, 9, "Hello"))),
+ Left((9500L, (6L, 18, "Hello"))),
+ Left((9000L, (6L, 9, "Hello"))),
+ Right(10000L),
+ Left((10000L, (7L, 7, "Hello World"))),
+ Left((11000L, (7L, 17, "Hello World"))),
+ Left((11000L, (7L, 77, "Hello World"))),
+ Right(12000L),
+ Left((14000L, (7L, 18, "Hello World"))),
+ Right(14000L),
+ Left((15000L, (8L, 8, "Hello World"))),
+ Right(17000L),
+ Left((20000L, (20L, 20, "Hello World"))),
+ Right(19000L))
+
+ val source = failingDataSource(data)
+ val t1 = source.transform("TimeAssigner", new EventTimeProcessOperator[(Long, Int, String)])
+ .setParallelism(source.parallelism)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT " +
+ " c, b, " +
+ " COUNT(a) " +
+ " OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
+ " SUM(a) " +
+ " OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW) " +
+ " FROM T1"
+
+ val sink = new TestingAppendSink
+ tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = List(
+ "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
+ "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
+ "Hello,3,4,9",
+ "Hello,4,2,7",
+ "Hello,5,2,9",
+ "Hello,6,2,11", "Hello,65,2,12",
+ "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
+ "Hello World,7,4,25", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
+ "Hello World,8,2,15",
+ "Hello World,20,1,20")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testRowTimeBoundedNonPartitionedRowsOver(): Unit = {
+ val data: Seq[Either[(Long, (Long, Int, String)), Long]] = Seq(
+ Left((2L, (2L, 2, "Hello"))),
+ Left((2L, (2L, 2, "Hello"))),
+ Left((1L, (1L, 1, "Hello"))),
+ Left((1L, (1L, 1, "Hello"))),
+ Left((2L, (2L, 2, "Hello"))),
+ Left((1L, (1L, 1, "Hello"))),
+ Left((20L, (20L, 20, "Hello World"))), // early row
+ Right(3L),
+ Left((2L, (2L, 2, "Hello"))), // late row
+ Left((3L, (3L, 3, "Hello"))),
+ Left((4L, (4L, 4, "Hello"))),
+ Left((5L, (5L, 5, "Hello"))),
+ Left((6L, (6L, 6, "Hello"))),
+ Left((7L, (7L, 7, "Hello World"))),
+ Right(7L),
+ Left((9L, (9L, 9, "Hello World"))),
+ Left((8L, (8L, 8, "Hello World"))),
+ Left((8L, (8L, 8, "Hello World"))),
+ Right(20L))
+
+ val source = failingDataSource(data)
+ val t1 = source.transform("TimeAssigner", new EventTimeProcessOperator[(Long, Int, String)])
+ .setParallelism(source.parallelism)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT " +
+ "c, a, " +
+ " COUNT(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW), " +
+ " SUM(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW) " +
+ "FROM T1"
+
+ val sink = new TestingAppendSink
+ tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = List(
+ "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
+ "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
+ "Hello,3,3,7",
+ "Hello,4,3,9", "Hello,5,3,12",
+ "Hello,6,3,15", "Hello World,7,3,18",
+ "Hello World,8,3,21", "Hello World,8,3,23",
+ "Hello World,9,3,25",
+ "Hello World,20,3,37")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testRowTimeUnBoundedPartitionedRangeOver(): Unit = {
+ val sqlQuery = "SELECT a, b, c, " +
+ " LTCNT(b, CAST('4' AS BIGINT)) OVER(" +
+ " PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " SUM(b) OVER (" +
+ " PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " COUNT(b) OVER (" +
+ " PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " AVG(b) OVER (" +
+ " PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " MAX(b) OVER (" +
+ " PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " MIN(b) OVER (" +
+ " PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " +
+ "FROM T1"
+
+ val data: Seq[Either[(Long, (Int, Long, String)), Long]] = Seq(
+ Left(14000005L, (1, 1L, "Hi")),
+ Left(14000000L, (2, 1L, "Hello")),
+ Left(14000002L, (1, 1L, "Hello")),
+ Left(14000002L, (1, 2L, "Hello")),
+ Left(14000002L, (1, 3L, "Hello world")),
+ Left(14000003L, (2, 2L, "Hello world")),
+ Left(14000003L, (2, 3L, "Hello world")),
+ Right(14000020L),
+ Left(14000021L, (1, 4L, "Hello world")),
+ Left(14000022L, (1, 5L, "Hello world")),
+ Left(14000022L, (1, 6L, "Hello world")),
+ Left(14000022L, (1, 7L, "Hello world")),
+ Left(14000023L, (2, 4L, "Hello world")),
+ Left(14000023L, (2, 5L, "Hello world")),
+ Right(14000030L))
+
+ val source = failingDataSource(data)
+ val t1 = source.transform("TimeAssigner", new EventTimeProcessOperator[(Int, Long, String)])
+ .setParallelism(source.parallelism)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime)
+
+ tEnv.registerTable("T1", t1)
+ tEnv.registerFunction("LTCNT", new LargerThanCount)
+
+ val sink = new TestingAppendSink
+ tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = List(
+ s"1,1,Hello,0,6,3,${6.0/3},3,1",
+ s"1,2,Hello,0,6,3,${6.0/3},3,1",
+ s"1,3,Hello world,0,6,3,${6.0/3},3,1",
+ s"1,1,Hi,0,7,4,${7.0/4},3,1",
+ s"2,1,Hello,0,1,1,${1.0/1},1,1",
+ s"2,2,Hello world,0,6,3,${6.0/3},3,1",
+ s"2,3,Hello world,0,6,3,${6.0/3},3,1",
+ s"1,4,Hello world,0,11,5,${11.0/5},4,1",
+ s"1,5,Hello world,3,29,8,${29.0/8},7,1",
+ s"1,6,Hello world,3,29,8,${29.0/8},7,1",
+ s"1,7,Hello world,3,29,8,${29.0/8},7,1",
+ s"2,4,Hello world,1,15,5,${15.0/5},5,1",
+ s"2,5,Hello world,1,15,5,${15.0/5},5,1")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testRowTimeUnBoundedPartitionedRowsOver(): Unit = {
+ val sqlQuery = "SELECT a, b, c, " +
+ "LTCNT(b, CAST('4' AS BIGINT)) over(" +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
+ "SUM(b) over (" +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
+ "count(b) over (" +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
+ "avg(b) over (" +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
+ "max(b) over (" +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
+ "min(b) over (" +
+ "partition by a order by rowtime rows between unbounded preceding and current row) " +
+ "from T1"
+
+ val data: Seq[Either[(Long, (Int, Long, String)), Long]] = Seq(
+ Left(14000005L, (1, 1L, "Hi")),
+ Left(14000000L, (2, 1L, "Hello")),
+ Left(14000002L, (3, 1L, "Hello")),
+ Left(14000003L, (1, 2L, "Hello")),
+ Left(14000004L, (1, 3L, "Hello world")),
+ Left(14000007L, (3, 2L, "Hello world")),
+ Left(14000008L, (2, 2L, "Hello world")),
+ Right(14000010L),
+ Left(14000012L, (1, 5L, "Hello world")),
+ Left(14000021L, (1, 6L, "Hello world")),
+ Left(14000023L, (2, 5L, "Hello world")),
+ Right(14000020L),
+ Left(14000024L, (3, 5L, "Hello world")),
+ Left(14000026L, (1, 7L, "Hello world")),
+ Left(14000025L, (1, 8L, "Hello world")),
+ Left(14000022L, (1, 9L, "Hello world")),
+ Right(14000030L))
+
+ val source = failingDataSource(data)
+ val t1 = source.transform("TimeAssigner", new EventTimeProcessOperator[(Int, Long, String)])
+ .setParallelism(source.parallelism)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime)
+
+ tEnv.registerTable("T1", t1)
+ tEnv.registerFunction("LTCNT", new LargerThanCount)
+
+ val sink = new TestingAppendSink
+ tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ s"1,2,Hello,0,2,1,${2.0/1},2,2",
+ s"1,3,Hello world,0,5,2,${5.0/2},3,2",
+ s"1,1,Hi,0,6,3,${6.0/3},3,1",
+ s"2,1,Hello,0,1,1,${1.0/1},1,1",
+ s"2,2,Hello world,0,3,2,${3.0/2},2,1",
+ s"3,1,Hello,0,1,1,${1.0/1},1,1",
+ s"3,2,Hello world,0,3,2,${3.0/2},2,1",
+ s"1,5,Hello world,1,11,4,${11.0/4},5,1",
+ s"1,6,Hello world,2,17,5,${17.0/5},6,1",
+ s"1,9,Hello world,3,26,6,${26.0/6},9,1",
+ s"1,8,Hello world,4,34,7,${34.0/7},9,1",
+ s"1,7,Hello world,5,41,8,${41.0/8},9,1",
+ s"2,5,Hello world,1,8,3,${8.0/3},5,1",
+ s"3,5,Hello world,1,8,3,${8.0/3},5,1")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testRowTimeUnBoundedNonPartitionedRangeOver(): Unit = {
+ val sqlQuery = "SELECT a, b, c, " +
+ " SUM(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " COUNT(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " AVG(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " MAX(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " MIN(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " +
+ "FROM T1"
+
+ val data: Seq[Either[(Long, (Int, Long, String)), Long]] = Seq(
+ Left(14000005L, (1, 1L, "Hi")),
+ Left(14000000L, (2, 1L, "Hello")),
+ Left(14000002L, (1, 1L, "Hello")),
+ Left(14000002L, (1, 2L, "Hello")),
+ Left(14000002L, (1, 3L, "Hello world")),
+ Left(14000003L, (2, 2L, "Hello world")),
+ Left(14000003L, (2, 3L, "Hello world")),
+ Right(14000020L),
+ Left(14000021L, (1, 4L, "Hello world")),
+ Left(14000022L, (1, 5L, "Hello world")),
+ Left(14000022L, (1, 6L, "Hello world")),
+ Left(14000022L, (1, 7L, "Hello world")),
+ Left(14000023L, (2, 4L, "Hello world")),
+ Left(14000023L, (2, 5L, "Hello world")),
+ Right(14000030L))
+
+ val source = failingDataSource(data)
+ val t1 = source.transform("TimeAssigner", new EventTimeProcessOperator[(Int, Long, String)])
+ .setParallelism(source.parallelism)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime)
+
+ tEnv.registerTable("T1", t1)
+
+ val sink = new TestingAppendSink
+ tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink).setParallelism(1)
+ env.execute()
+
+ val expected = List(
+ s"2,1,Hello,1,1,${1.0/1},1,1",
+ s"1,1,Hello,7,4,${7.0/4},3,1",
+ s"1,2,Hello,7,4,${7.0/4},3,1",
+ s"1,3,Hello world,7,4,${7.0/4},3,1",
+ s"2,2,Hello world,12,6,${12.0/6},3,1",
+ s"2,3,Hello world,12,6,${12.0/6},3,1",
+ s"1,1,Hi,13,7,${13.0/7},3,1",
+ s"1,4,Hello world,17,8,${17.0/8},4,1",
+ s"1,5,Hello world,35,11,${35.0/11},7,1",
+ s"1,6,Hello world,35,11,${35.0/11},7,1",
+ s"1,7,Hello world,35,11,${35.0/11},7,1",
+ s"2,4,Hello world,44,13,${44.0/13},7,1",
+ s"2,5,Hello world,44,13,${44.0/13},7,1")
+
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testRowTimeUnBoundedNonPartitionedRowsOver(): Unit = {
+ val sqlQuery = "SELECT a, b, c, " +
+ " SUM(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " COUNT(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " AVG(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " MAX(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " MIN(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " +
+ "FROM T1"
+
+ val data: Seq[Either[(Long, (Int, Long, String)), Long]] = Seq(
+ Left(14000005L, (1, 1L, "Hi")),
+ Left(14000000L, (2, 2L, "Hello")),
+ Left(14000002L, (3, 5L, "Hello")),
+ Left(14000003L, (1, 3L, "Hello")),
+ Left(14000004L, (3, 7L, "Hello world")),
+ Left(14000007L, (4, 9L, "Hello world")),
+ Left(14000008L, (5, 8L, "Hello world")),
+ Right(14000010L),
+ // this element will be discard because it is late
+ Left(14000008L, (6, 8L, "Hello world")),
+ Right(14000020L),
+ Left(14000021L, (6, 8L, "Hello world")),
+ Right(14000030L)
+ )
+
+ val source = failingDataSource(data)
+ val t1 = source.transform("TimeAssigner", new EventTimeProcessOperator[(Int, Long, String)])
+ .setParallelism(source.parallelism)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime)
+
+ tEnv.registerTable("T1", t1)
+
+ val sink = new TestingAppendSink
+ tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink).setParallelism(1)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ s"2,2,Hello,2,1,${2.0/1},2,2",
+ s"3,5,Hello,7,2,${7.0/2},5,2",
+ s"1,3,Hello,10,3,${10.0/3},5,2",
+ s"3,7,Hello world,17,4,${17.0/4},7,2",
+ s"1,1,Hi,18,5,${18.0/5},7,1",
+ s"4,9,Hello world,27,6,${27.0/6},9,1",
+ s"5,8,Hello world,35,7,${35.0/7},9,1",
+ s"6,8,Hello world,43,8,${43.0/8},9,1")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ /** test sliding event-time unbounded window with partition by **/
+ @Test
+ def testRowTimeUnBoundedPartitionedRowsOver2(): Unit = {
+ val sqlQuery = "SELECT a, b, c, " +
+ "SUM(b) over (" +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
+ "count(b) over (" +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
+ "avg(b) over (" +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
+ "max(b) over (" +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
+ "min(b) over (" +
+ "partition by a order by rowtime rows between unbounded preceding and current row) " +
+ "from T1"
+
+ val data: Seq[Either[(Long, (Int, Long, String)), Long]] = Seq(
+ Left(14000005L, (1, 1L, "Hi")),
+ Left(14000000L, (2, 1L, "Hello")),
+ Left(14000002L, (3, 1L, "Hello")),
+ Left(14000003L, (1, 2L, "Hello")),
+ Left(14000004L, (1, 3L, "Hello world")),
+ Left(14000007L, (3, 2L, "Hello world")),
+ Left(14000008L, (2, 2L, "Hello world")),
+ Right(14000010L),
+ // the next 3 elements are late
+ Left(14000008L, (1, 4L, "Hello world")),
+ Left(14000008L, (2, 3L, "Hello world")),
+ Left(14000008L, (3, 3L, "Hello world")),
+ Left(14000012L, (1, 5L, "Hello world")),
+ Right(14000020L),
+ Left(14000021L, (1, 6L, "Hello world")),
+ // the next 3 elements are late
+ Left(14000019L, (1, 6L, "Hello world")),
+ Left(14000018L, (2, 4L, "Hello world")),
+ Left(14000018L, (3, 4L, "Hello world")),
+ Left(14000022L, (2, 5L, "Hello world")),
+ Left(14000022L, (3, 5L, "Hello world")),
+ Left(14000024L, (1, 7L, "Hello world")),
+ Left(14000023L, (1, 8L, "Hello world")),
+ Left(14000021L, (1, 9L, "Hello world")),
+ Right(14000030L)
+ )
+
+ val source = failingDataSource(data)
+ val t1 = source.transform("TimeAssigner", new EventTimeProcessOperator[(Int, Long, String)])
+ .setParallelism(source.parallelism)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime)
+
+ tEnv.registerTable("T1", t1)
+
+ val sink = new TestingAppendSink
+ tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = List(
+ s"1,2,Hello,2,1,${2.0/1},2,2",
+ s"1,3,Hello world,5,2,${5.0/2},3,2",
+ s"1,1,Hi,6,3,${6.0/3},3,1",
+ s"2,1,Hello,1,1,${1.0/1},1,1",
+ s"2,2,Hello world,3,2,${3.0/2},2,1",
+ s"3,1,Hello,1,1,${1.0/1},1,1",
+ s"3,2,Hello world,3,2,${3.0/2},2,1",
+ s"1,5,Hello world,11,4,${11.0/4},5,1",
+ s"1,6,Hello world,17,5,${17.0/5},6,1",
+ s"1,9,Hello world,26,6,${26.0/6},9,1",
+ s"1,8,Hello world,34,7,${34.0/7},9,1",
+ s"1,7,Hello world,41,8,${41.0/8},9,1",
+ s"2,5,Hello world,8,3,${8.0/3},5,1",
+ s"3,5,Hello world,8,3,${8.0/3},5,1"
+ )
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = {
+ val t = failingDataSource(StreamTestData.get5TupleData)
+ .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime)
+ tEnv.registerTable("MyTable", t)
+
+ val sqlQuery = "SELECT a, " +
+ " COUNT(e) OVER (" +
+ " PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+ " SUM(DISTINCT e) OVER (" +
+ " PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+ " MIN(DISTINCT e) OVER (" +
+ " PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " +
+ "FROM MyTable"
+
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+ val sink = new TestingAppendSink
+ result.addSink(sink)
+ env.execute()
+
+ val expected = List(
+ "1,1,1,1",
+ "2,1,2,2",
+ "2,2,3,1",
+ "3,1,2,2",
+ "3,2,2,2",
+ "3,3,5,2",
+ "4,1,2,2",
+ "4,2,3,1",
+ "4,3,3,1",
+ "4,4,3,1",
+ "5,1,1,1",
+ "5,2,4,1",
+ "5,3,4,1",
+ "5,4,6,1",
+ "5,5,6,1")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testRowTimeDistinctUnboundedPartitionedRangeOverWithNullValues(): Unit = {
+ val data = List(
+ (1L, 1, null),
+ (2L, 1, null),
+ (3L, 2, null),
+ (4L, 1, "Hello"),
+ (5L, 1, "Hello"),
+ (6L, 2, "Hello"),
+ (7L, 1, "Hello World"),
+ (8L, 2, "Hello World"),
+ (9L, 2, "Hello World"),
+ (10L, 1, null))
+
+ // for sum aggregation ensure that every time the order of each element is consistent
+ env.setParallelism(1)
+
+ val table = failingDataSource(data)
+ .assignAscendingTimestamps(_._1)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime)
+
+ tEnv.registerTable("MyTable", table)
+ tEnv.registerFunction("CntNullNonNull", new CountNullNonNull)
+
+ val sqlQuery = "SELECT " +
+ " c, " +
+ " b, " +
+ " COUNT(DISTINCT c) " +
+ " OVER (PARTITION BY b ORDER BY rowtime RANGE UNBOUNDED preceding), " +
+ " CntNullNonNull(DISTINCT c) " +
+ " OVER (PARTITION BY b ORDER BY rowtime RANGE UNBOUNDED preceding)" +
+ "FROM " +
+ " MyTable"
+
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+ val sink = new TestingAppendSink
+ result.addSink(sink)
+ env.execute()
+
+ val expected = List(
+ "null,1,0,0|1", "null,1,0,0|1", "null,2,0,0|1", "null,1,2,2|1",
+ "Hello,1,1,1|1", "Hello,1,1,1|1", "Hello,2,1,1|1",
+ "Hello World,1,2,2|1", "Hello World,2,2,2|1", "Hello World,2,2,2|1")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = {
+ val t = failingDataSource(StreamTestData.get5TupleData)
+ .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime)
+ tEnv.registerTable("MyTable", t)
+
+ val sqlQuery = "SELECT a, " +
+ " SUM(DISTINCT e) OVER (" +
+ " PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " +
+ " MIN(DISTINCT e) OVER (" +
+ " PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " +
+ " COLLECT(DISTINCT e) OVER (" +
+ " PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) " +
+ "FROM MyTable"
+
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+ val sink = new TestingAppendSink
+ result.addSink(sink)
+ env.execute()
+
+ val expected = List(
+ "1,1,1,{1=1}",
+ "2,2,2,{2=1}",
+ "2,3,1,{1=1, 2=1}",
+ "3,2,2,{2=1}",
+ "3,2,2,{2=1}",
+ "3,5,2,{2=1, 3=1}",
+ "4,2,2,{2=1}",
+ "4,3,1,{1=1, 2=1}",
+ "4,3,1,{1=1, 2=1}",
+ "4,3,1,{1=1, 2=1}",
+ "5,1,1,{1=1}",
+ "5,4,1,{1=1, 3=1}",
+ "5,4,1,{1=1, 3=1}",
+ "5,6,1,{1=1, 2=1, 3=1}",
+ "5,5,2,{2=1, 3=1}")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testProcTimeDistinctPairWithNulls(): Unit = {
+
+ val data = List(
+ ("A", null),
+ ("A", null),
+ ("B", null),
+ (null, "Hello"),
+ ("A", "Hello"),
+ ("A", "Hello"),
+ (null, "Hello World"),
+ (null, "Hello World"),
+ ("A", "Hello World"),
+ ("B", "Hello World"))
+
+ env.setParallelism(1)
+
+ val table = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'proctime)
+ tEnv.registerTable("MyTable", table)
+ tEnv.registerFunction("PairCount", new CountPairs)
+
+ val sqlQuery = "SELECT a, b, " +
+ " PairCount(a, b) OVER (ORDER BY proctime RANGE UNBOUNDED preceding), " +
+ " PairCount(DISTINCT a, b) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) " +
+ "FROM MyTable"
+
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+ val sink = new TestingAppendSink
+ result.addSink(sink)
+ env.execute()
+
+ val expected = List(
+ "A,null,1,1",
+ "A,null,2,1",
+ "B,null,3,2",
+ "null,Hello,4,3",
+ "A,Hello,5,4",
+ "A,Hello,6,4",
+ "null,Hello World,7,5",
+ "null,Hello World,8,5",
+ "A,Hello World,9,6",
+ "B,Hello World,10,7")
+ assertEquals(expected, sink.getAppendResults)
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/KeyedProcessFunctionWithCleanupState.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/KeyedProcessFunctionWithCleanupState.java
index a761c5f..5c8217c 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/KeyedProcessFunctionWithCleanupState.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/KeyedProcessFunctionWithCleanupState.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import java.io.IOException;
+
/**
* A function that processes elements of a stream, and could cleanup state.
* @param <K> Type of the key.
@@ -79,4 +81,13 @@ public abstract class KeyedProcessFunctionWithCleanupState<K, IN, OUT>
this.cleanupTimeState.clear();
}
+ protected Boolean needToCleanupState(Long timestamp) throws IOException {
+ if (stateCleaningEnabled) {
+ Long cleanupTime = cleanupTimeState.value();
+ // check that the triggered timer is the last registered processing time timer.
+ return null != cleanupTime && timestamp == cleanupTime;
+ } else {
+ return false;
+ }
+ }
}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/AbstractRowTimeUnboundedPrecedingOver.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/AbstractRowTimeUnboundedPrecedingOver.java
new file mode 100644
index 0000000..0362976
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/AbstractRowTimeUnboundedPrecedingOver.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.over;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.dataview.PerKeyStateDataViewStore;
+import org.apache.flink.table.generated.AggsHandleFunction;
+import org.apache.flink.table.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.Collector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+
+/**
+ * A basic implementation to support unbounded event-time over-window.
+ */
+public abstract class AbstractRowTimeUnboundedPrecedingOver<K> extends KeyedProcessFunctionWithCleanupState<K, BaseRow, BaseRow> {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractRowTimeUnboundedPrecedingOver.class);
+
+ private final GeneratedAggsHandleFunction genAggsHandler;
+ private final InternalType[] accTypes;
+ private final InternalType[] inputFieldTypes;
+ private final int rowTimeIdx;
+
+ protected transient JoinedRow output;
+ // state to hold the accumulators of the aggregations
+ private transient ValueState<BaseRow> accState;
+ // state to hold rows until the next watermark arrives
+ private transient MapState<Long, List<BaseRow>> inputState;
+ // list to sort timestamps to access rows in timestamp order
+ private transient LinkedList<Long> sortedTimestamps;
+
+ protected transient AggsHandleFunction function;
+
+ public AbstractRowTimeUnboundedPrecedingOver(
+ long minRetentionTime,
+ long maxRetentionTime,
+ GeneratedAggsHandleFunction genAggsHandler,
+ InternalType[] accTypes,
+ InternalType[] inputFieldTypes,
+ int rowTimeIdx) {
+ super(minRetentionTime, maxRetentionTime);
+ this.genAggsHandler = genAggsHandler;
+ this.accTypes = accTypes;
+ this.inputFieldTypes = inputFieldTypes;
+ this.rowTimeIdx = rowTimeIdx;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ function = genAggsHandler.newInstance(getRuntimeContext().getUserCodeClassLoader());
+ function.open(new PerKeyStateDataViewStore(getRuntimeContext()));
+
+ output = new JoinedRow();
+
+ sortedTimestamps = new LinkedList<Long>();
+
+ // initialize accumulator state
+ BaseRowTypeInfo accTypeInfo = new BaseRowTypeInfo(accTypes);
+ ValueStateDescriptor<BaseRow> accStateDesc =
+ new ValueStateDescriptor<BaseRow>("accState", accTypeInfo);
+ accState = getRuntimeContext().getState(accStateDesc);
+
+ // input element are all binary row as they are came from network
+ BaseRowTypeInfo inputType = new BaseRowTypeInfo(inputFieldTypes);
+ ListTypeInfo<BaseRow> rowListTypeInfo = new ListTypeInfo<BaseRow>(inputType);
+ MapStateDescriptor<Long, List<BaseRow>> inputStateDesc = new MapStateDescriptor<Long, List<BaseRow>>(
+ "inputState",
+ Types.LONG,
+ rowListTypeInfo);
+ inputState = getRuntimeContext().getMapState(inputStateDesc);
+
+ initCleanupTimeState("RowTimeUnboundedOverCleanupTime");
+ }
+
+ /**
+ * Puts an element from the input stream into state if it is not late.
+ * Registers a timer for the next watermark.
+ *
+ * @param input The input value.
+ * @param ctx A {@link Context} that allows querying the timestamp of the element and getting
+ * TimerService for registering timers and querying the time. The
+ * context is only valid during the invocation of this method, do not store it.
+ * @param out The collector for returning result values.
+ * @throws Exception
+ */
+ @Override
+ public void processElement(
+ BaseRow input,
+ KeyedProcessFunction<K, BaseRow, BaseRow>.Context ctx,
+ Collector<BaseRow> out) throws Exception {
+ // register state-cleanup timer
+ registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime());
+
+ long timestamp = input.getLong(rowTimeIdx);
+ long curWatermark = ctx.timerService().currentWatermark();
+
+ // discard late record
+ if (timestamp > curWatermark) {
+ // ensure every key just registers one timer
+ // default watermark is Long.Min, avoid overflow we use zero when watermark < 0
+ long triggerTs = curWatermark < 0 ? 0 : curWatermark + 1;
+ ctx.timerService().registerEventTimeTimer(triggerTs);
+
+ // put row into state
+ List<BaseRow> rowList = inputState.get(timestamp);
+ if (rowList == null) {
+ rowList = new ArrayList<BaseRow>();
+ }
+ rowList.add(input);
+ inputState.put(timestamp, rowList);
+ }
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ KeyedProcessFunction<K, BaseRow, BaseRow>.OnTimerContext ctx,
+ Collector<BaseRow> out) throws Exception {
+ if (isProcessingTimeTimer(ctx)) {
+ if (needToCleanupState(timestamp)) {
+
+ // we check whether there are still records which have not been processed yet
+ boolean noRecordsToProcess = !inputState.contains(timestamp);
+ if (noRecordsToProcess) {
+ // we clean the state
+ cleanupState(inputState, accState);
+ function.cleanup();
+ } else {
+ // There are records left to process because a watermark has not been received yet.
+ // This would only happen if the input stream has stopped. So we don't need to clean up.
+ // We leave the state as it is and schedule a new cleanup timer
+ registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime());
+ }
+ }
+ return;
+ }
+
+ Iterator<Long> keyIterator = inputState.keys().iterator();
+ if (keyIterator.hasNext()) {
+ Long curWatermark = ctx.timerService().currentWatermark();
+ boolean existEarlyRecord = false;
+
+ // sort the record timestamps
+ do {
+ Long recordTime = keyIterator.next();
+ // only take timestamps smaller/equal to the watermark
+ if (recordTime <= curWatermark) {
+ insertToSortedList(recordTime);
+ } else {
+ existEarlyRecord = true;
+ }
+ } while (keyIterator.hasNext());
+
+ // get last accumulator
+ BaseRow lastAccumulator = accState.value();
+ if (lastAccumulator == null) {
+ // initialize accumulator
+ lastAccumulator = function.createAccumulators();
+ }
+ // set accumulator in function context first
+ function.setAccumulators(lastAccumulator);
+
+ // emit the rows in order
+ while (!sortedTimestamps.isEmpty()) {
+ Long curTimestamp = sortedTimestamps.removeFirst();
+ List<BaseRow> curRowList = inputState.get(curTimestamp);
+ if (curRowList != null) {
+ // process the same timestamp datas, the mechanism is different according ROWS or RANGE
+ processElementsWithSameTimestamp(curRowList, out);
+ } else {
+ // Ignore the same timestamp datas if the state is cleared already.
+ LOG.warn("The state is cleared because of state ttl. " +
+ "This will result in incorrect result. " +
+ "You can increase the state ttl to avoid this.");
+ }
+ inputState.remove(curTimestamp);
+ }
+
+ // update acc state
+ lastAccumulator = function.getAccumulators();
+ accState.update(lastAccumulator);
+
+ // if are are rows with timestamp > watermark, register a timer for the next watermark
+ if (existEarlyRecord) {
+ ctx.timerService().registerEventTimeTimer(curWatermark + 1);
+ }
+ }
+
+ // update cleanup timer
+ registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime());
+ }
+
+ /**
+ * Inserts timestamps in order into a linked list.
+ * If timestamps arrive in order (as in case of using the RocksDB state backend) this is just
+ * an append with O(1).
+ */
+ private void insertToSortedList(Long recordTimestamp) {
+ ListIterator<Long> listIterator = sortedTimestamps.listIterator(sortedTimestamps.size());
+ boolean isContinue = true;
+ while (listIterator.hasPrevious() && isContinue) {
+ Long timestamp = listIterator.previous();
+ if (recordTimestamp >= timestamp) {
+ listIterator.next();
+ listIterator.add(recordTimestamp);
+ isContinue = false;
+ }
+ }
+
+ if (isContinue) {
+ sortedTimestamps.addFirst(recordTimestamp);
+ }
+ }
+
+ /**
+ * Process the same timestamp datas, the mechanism is different between
+ * rows and range window.
+ */
+ protected abstract void processElementsWithSameTimestamp(
+ List<BaseRow> curRowList,
+ Collector<BaseRow> out) throws Exception;
+
+ @Override
+ public void close() throws Exception {
+ if (null != function) {
+ function.close();
+ }
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeRangeBoundedPrecedingFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeRangeBoundedPrecedingFunction.java
new file mode 100644
index 0000000..de53101
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeRangeBoundedPrecedingFunction.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.over;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.dataview.PerKeyStateDataViewStore;
+import org.apache.flink.table.generated.AggsHandleFunction;
+import org.apache.flink.table.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.Collector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Process Function used for the aggregate in bounded proc-time OVER window.
+ *
+ * <p>E.g.:
+ * SELECT currtime, b, c,
+ * min(c) OVER
+ * (PARTITION BY b ORDER BY proctime
+ * RANGE BETWEEN INTERVAL '4' SECOND PRECEDING AND CURRENT ROW),
+ * max(c) OVER
+ * (PARTITION BY b ORDER BY proctime
+ * RANGE BETWEEN INTERVAL '4' SECOND PRECEDING AND CURRENT ROW)
+ * FROM T.
+ */
+public class ProcTimeRangeBoundedPrecedingFunction<K> extends KeyedProcessFunctionWithCleanupState<K, BaseRow, BaseRow> {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(ProcTimeRangeBoundedPrecedingFunction.class);
+
+ private final GeneratedAggsHandleFunction genAggsHandler;
+ private final InternalType[] accTypes;
+ private final InternalType[] inputFieldTypes;
+ private final long precedingTimeBoundary;
+
+ private transient ValueState<BaseRow> accState;
+ private transient MapState<Long, List<BaseRow>> inputState;
+
+ private transient AggsHandleFunction function;
+ private transient JoinedRow output;
+
+ public ProcTimeRangeBoundedPrecedingFunction(
+ long minRetentionTime,
+ long maxRetentionTime,
+ GeneratedAggsHandleFunction genAggsHandler,
+ InternalType[] accTypes,
+ InternalType[] inputFieldTypes,
+ long precedingTimeBoundary) {
+ super(minRetentionTime, maxRetentionTime);
+ this.genAggsHandler = genAggsHandler;
+ this.accTypes = accTypes;
+ this.inputFieldTypes = inputFieldTypes;
+ this.precedingTimeBoundary = precedingTimeBoundary;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ function = genAggsHandler.newInstance(getRuntimeContext().getUserCodeClassLoader());
+ function.open(new PerKeyStateDataViewStore(getRuntimeContext()));
+
+ output = new JoinedRow();
+
+ // input element are all binary row as they are came from network
+ BaseRowTypeInfo inputType = new BaseRowTypeInfo(inputFieldTypes);
+ // we keep the elements received in a map state indexed based on their ingestion time
+ ListTypeInfo<BaseRow> rowListTypeInfo = new ListTypeInfo<BaseRow>(inputType);
+ MapStateDescriptor<Long, List<BaseRow>> mapStateDescriptor = new MapStateDescriptor<Long, List<BaseRow>>(
+ "inputState", BasicTypeInfo.LONG_TYPE_INFO, rowListTypeInfo);
+ inputState = getRuntimeContext().getMapState(mapStateDescriptor);
+
+ BaseRowTypeInfo accTypeInfo = new BaseRowTypeInfo(accTypes);
+ ValueStateDescriptor<BaseRow> stateDescriptor =
+ new ValueStateDescriptor<BaseRow>("accState", accTypeInfo);
+ accState = getRuntimeContext().getState(stateDescriptor);
+
+ initCleanupTimeState("ProcTimeBoundedRangeOverCleanupTime");
+ }
+
+ @Override
+ public void processElement(
+ BaseRow input,
+ KeyedProcessFunction<K, BaseRow, BaseRow>.Context ctx,
+ Collector<BaseRow> out) throws Exception {
+ long currentTime = ctx.timerService().currentProcessingTime();
+ // register state-cleanup timer
+ registerProcessingCleanupTimer(ctx, currentTime);
+
+ // buffer the event incoming event
+
+ // add current element to the window list of elements with corresponding timestamp
+ List<BaseRow> rowList = inputState.get(currentTime);
+ // null value means that this si the first event received for this timestamp
+ if (rowList == null) {
+ rowList = new ArrayList<BaseRow>();
+ // register timer to process event once the current millisecond passed
+ ctx.timerService().registerProcessingTimeTimer(currentTime + 1);
+ }
+ rowList.add(input);
+ inputState.put(currentTime, rowList);
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ KeyedProcessFunction<K, BaseRow, BaseRow>.OnTimerContext ctx,
+ Collector<BaseRow> out) throws Exception {
+ if (needToCleanupState(timestamp)) {
+ // clean up and return
+ cleanupState(inputState, accState);
+ function.cleanup();
+ return;
+ }
+
+ // remove timestamp set outside of ProcessFunction.
+ ((TimestampedCollector) out).eraseTimestamp();
+
+ // we consider the original timestamp of events
+ // that have registered this time trigger 1 ms ago
+
+ long currentTime = timestamp - 1;
+
+ // get the list of elements of current proctime
+ List<BaseRow> currentElements = inputState.get(currentTime);
+
+ // Expired clean-up timers pass the needToCleanupState check.
+ // Perform a null check to verify that we have data to process.
+ if (null == currentElements) {
+ return;
+ }
+
+ // initialize the accumulators
+ BaseRow accumulators = accState.value();
+
+ if (null == accumulators) {
+ accumulators = function.createAccumulators();
+ }
+
+ // set accumulators in context first
+ function.setAccumulators(accumulators);
+
+ // update the elements to be removed and retract them from aggregators
+ long limit = currentTime - precedingTimeBoundary;
+
+ // we iterate through all elements in the window buffer based on timestamp keys
+ // when we find timestamps that are out of interest, we retrieve corresponding elements
+ // and eliminate them. Multiple elements could have been received at the same timestamp
+ // the removal of old elements happens only once per proctime as onTimer is called only once
+ Iterator<Long> iter = inputState.keys().iterator();
+ List<Long> markToRemove = new ArrayList<Long>();
+ while (iter.hasNext()) {
+ Long elementKey = iter.next();
+ if (elementKey < limit) {
+ // element key outside of window. Retract values
+ List<BaseRow> elementsRemove = inputState.get(elementKey);
+ if (elementsRemove != null) {
+ int iRemove = 0;
+ while (iRemove < elementsRemove.size()) {
+ BaseRow retractRow = elementsRemove.get(iRemove);
+ function.retract(retractRow);
+ iRemove += 1;
+ }
+ } else {
+ // Does not retract values which are outside of window if the state is cleared already.
+ LOG.warn("The state is cleared because of state ttl. " +
+ "This will result in incorrect result. " +
+ "You can increase the state ttl to avoid this.");
+ }
+
+ // mark element for later removal not to modify the iterator over MapState
+ markToRemove.add(elementKey);
+ }
+ }
+
+ // need to remove in 2 steps not to have concurrent access errors via iterator to the MapState
+ int i = 0;
+ while (i < markToRemove.size()) {
+ inputState.remove(markToRemove.get(i));
+ i += 1;
+ }
+
+ // add current elements to aggregator. Multiple elements might
+ // have arrived in the same proctime
+ // the same accumulator value will be computed for all elements
+ int iElemenets = 0;
+ while (iElemenets < currentElements.size()) {
+ BaseRow input = currentElements.get(iElemenets);
+ function.accumulate(input);
+ iElemenets += 1;
+ }
+
+ // we need to build the output and emit for every event received at this proctime
+ iElemenets = 0;
+ BaseRow aggValue = function.getValue();
+ while (iElemenets < currentElements.size()) {
+ BaseRow input = currentElements.get(iElemenets);
+ output.replace(input, aggValue);
+ out.collect(output);
+ iElemenets += 1;
+ }
+
+ // update the value of accumulators for future incremental computation
+ accumulators = function.getAccumulators();
+ accState.update(accumulators);
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (null != function) {
+ function.close();
+ }
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeRowsBoundedPrecedingFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeRowsBoundedPrecedingFunction.java
new file mode 100644
index 0000000..fb25e1e
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeRowsBoundedPrecedingFunction.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.over;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.dataview.PerKeyStateDataViewStore;
+import org.apache.flink.table.generated.AggsHandleFunction;
+import org.apache.flink.table.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Process Function for ROW clause processing-time bounded OVER window.
+ *
+ * <p>E.g.:
+ * SELECT currtime, b, c,
+ * min(c) OVER
+ * (PARTITION BY b ORDER BY proctime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW),
+ * max(c) OVER
+ * (PARTITION BY b ORDER BY proctime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)
+ * FROM T.
+ */
+public class ProcTimeRowsBoundedPrecedingFunction<K> extends KeyedProcessFunctionWithCleanupState<K, BaseRow, BaseRow> {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(ProcTimeRowsBoundedPrecedingFunction.class);
+
+ private final GeneratedAggsHandleFunction genAggsHandler;
+ private final InternalType[] accTypes;
+ private final InternalType[] inputFieldTypes;
+ private final long precedingOffset;
+
+ private transient AggsHandleFunction function;
+
+ private transient ValueState<BaseRow> accState;
+ private transient MapState<Long, List<BaseRow>> inputState;
+ private transient ValueState<Long> counterState;
+ private transient ValueState<Long> smallestTsState;
+
+ private transient JoinedRow output;
+
+ public ProcTimeRowsBoundedPrecedingFunction(
+ long minRetentionTime,
+ long maxRetentionTime,
+ GeneratedAggsHandleFunction genAggsHandler,
+ InternalType[] accTypes,
+ InternalType[] inputFieldTypes,
+ long precedingOffset) {
+ super(minRetentionTime, maxRetentionTime);
+ Preconditions.checkArgument(precedingOffset > 0);
+ this.genAggsHandler = genAggsHandler;
+ this.accTypes = accTypes;
+ this.inputFieldTypes = inputFieldTypes;
+ this.precedingOffset = precedingOffset;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ function = genAggsHandler.newInstance(getRuntimeContext().getUserCodeClassLoader());
+ function.open(new PerKeyStateDataViewStore(getRuntimeContext()));
+
+ output = new JoinedRow();
+
+ // input element are all binary row as they are came from network
+ BaseRowTypeInfo inputType = new BaseRowTypeInfo(inputFieldTypes);
+ // We keep the elements received in a Map state keyed
+ // by the ingestion time in the operator.
+ // we also keep counter of processed elements
+ // and timestamp of oldest element
+ ListTypeInfo<BaseRow> rowListTypeInfo = new ListTypeInfo<BaseRow>(inputType);
+ MapStateDescriptor<Long, List<BaseRow>> mapStateDescriptor = new MapStateDescriptor<Long, List<BaseRow>>(
+ "inputState", BasicTypeInfo.LONG_TYPE_INFO, rowListTypeInfo);
+ inputState = getRuntimeContext().getMapState(mapStateDescriptor);
+
+ BaseRowTypeInfo accTypeInfo = new BaseRowTypeInfo(accTypes);
+ ValueStateDescriptor<BaseRow> stateDescriptor =
+ new ValueStateDescriptor<BaseRow>("accState", accTypeInfo);
+ accState = getRuntimeContext().getState(stateDescriptor);
+
+ ValueStateDescriptor<Long> processedCountDescriptor = new ValueStateDescriptor<Long>(
+ "processedCountState",
+ Types.LONG);
+ counterState = getRuntimeContext().getState(processedCountDescriptor);
+
+ ValueStateDescriptor<Long> smallestTimestampDescriptor = new ValueStateDescriptor<Long>(
+ "smallestTSState",
+ Types.LONG);
+ smallestTsState = getRuntimeContext().getState(smallestTimestampDescriptor);
+
+ initCleanupTimeState("ProcTimeBoundedRowsOverCleanupTime");
+ }
+
+ @Override
+ public void processElement(
+ BaseRow input,
+ KeyedProcessFunction<K, BaseRow, BaseRow>.Context ctx,
+ Collector<BaseRow> out) throws Exception {
+ long currentTime = ctx.timerService().currentProcessingTime();
+ // register state-cleanup timer
+ registerProcessingCleanupTimer(ctx, currentTime);
+
+ // initialize state for the processed element
+ BaseRow accumulators = accState.value();
+ if (accumulators == null) {
+ accumulators = function.createAccumulators();
+ }
+ // set accumulators in context first
+ function.setAccumulators(accumulators);
+
+ // get smallest timestamp
+ Long smallestTs = smallestTsState.value();
+ if (smallestTs == null) {
+ smallestTs = currentTime;
+ smallestTsState.update(smallestTs);
+ }
+ // get previous counter value
+ Long counter = counterState.value();
+ if (counter == null) {
+ counter = 0L;
+ }
+
+ if (counter == precedingOffset) {
+ List<BaseRow> retractList = inputState.get(smallestTs);
+ if (retractList != null) {
+ // get oldest element beyond buffer size
+ // and if oldest element exist, retract value
+ BaseRow retractRow = retractList.get(0);
+ function.retract(retractRow);
+ retractList.remove(0);
+ } else {
+ // Does not retract values which are outside of window if the state is cleared already.
+ LOG.warn("The state is cleared because of state ttl. " +
+ "This will result in incorrect result. " +
+ "You can increase the state ttl to avoid this.");
+ }
+ // if reference timestamp list not empty, keep the list
+ if (retractList != null && !retractList.isEmpty()) {
+ inputState.put(smallestTs, retractList);
+ } // if smallest timestamp list is empty, remove and find new smallest
+ else {
+ inputState.remove(smallestTs);
+ Iterator<Long> iter = inputState.keys().iterator();
+ long currentTs = 0L;
+ long newSmallestTs = Long.MAX_VALUE;
+ while (iter.hasNext()) {
+ currentTs = iter.next();
+ if (currentTs < newSmallestTs) {
+ newSmallestTs = currentTs;
+ }
+ }
+ smallestTsState.update(newSmallestTs);
+ }
+ } // we update the counter only while buffer is getting filled
+ else {
+ counter += 1;
+ counterState.update(counter);
+ }
+
+ // update map state, counter and timestamp
+ List<BaseRow> currentTimeState = inputState.get(currentTime);
+ if (currentTimeState != null) {
+ currentTimeState.add(input);
+ inputState.put(currentTime, currentTimeState);
+ } else { // add new input
+ List<BaseRow> newList = new ArrayList<BaseRow>();
+ newList.add(input);
+ inputState.put(currentTime, newList);
+ }
+
+ // accumulate current row
+ function.accumulate(input);
+ // update the value of accumulators for future incremental computation
+ accumulators = function.getAccumulators();
+ accState.update(accumulators);
+
+ // prepare output row
+ BaseRow aggValue = function.getValue();
+ output.replace(input, aggValue);
+ out.collect(output);
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ KeyedProcessFunction<K, BaseRow, BaseRow>.OnTimerContext ctx,
+ Collector<BaseRow> out) throws Exception {
+ if (needToCleanupState(timestamp)) {
+ cleanupState(inputState, accState, counterState, smallestTsState);
+ function.cleanup();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (null != function) {
+ function.close();
+ }
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeUnboundedPrecedingFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeUnboundedPrecedingFunction.java
new file mode 100644
index 0000000..50b0b78
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeUnboundedPrecedingFunction.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.over;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.dataview.PerKeyStateDataViewStore;
+import org.apache.flink.table.generated.AggsHandleFunction;
+import org.apache.flink.table.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.Collector;
+
+/**
+ * Process Function for processing-time unbounded OVER window.
+ *
+ * <p>E.g.:
+ * SELECT currtime, b, c,
+ * min(c) OVER
+ * (PARTITION BY b ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW),
+ * max(c) OVER
+ * (PARTITION BY b ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)
+ * FROM T.
+ */
+public class ProcTimeUnboundedPrecedingFunction<K> extends KeyedProcessFunctionWithCleanupState<K, BaseRow, BaseRow> {
+ private static final long serialVersionUID = 1L;
+
+ private final GeneratedAggsHandleFunction genAggsHandler;
+ private final InternalType[] accTypes;
+
+ private transient AggsHandleFunction function;
+ private transient ValueState<BaseRow> accState;
+ private transient JoinedRow output;
+
+ public ProcTimeUnboundedPrecedingFunction(
+ long minRetentionTime,
+ long maxRetentionTime,
+ GeneratedAggsHandleFunction genAggsHandler,
+ InternalType[] accTypes) {
+ super(minRetentionTime, maxRetentionTime);
+ this.genAggsHandler = genAggsHandler;
+ this.accTypes = accTypes;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ function = genAggsHandler.newInstance(getRuntimeContext().getUserCodeClassLoader());
+ function.open(new PerKeyStateDataViewStore(getRuntimeContext()));
+
+ output = new JoinedRow();
+
+ BaseRowTypeInfo accTypeInfo = new BaseRowTypeInfo(accTypes);
+ ValueStateDescriptor<BaseRow> stateDescriptor =
+ new ValueStateDescriptor<BaseRow>("accState", accTypeInfo);
+ accState = getRuntimeContext().getState(stateDescriptor);
+
+ initCleanupTimeState("ProcTimeUnboundedOverCleanupTime");
+ }
+
+ @Override
+ public void processElement(
+ BaseRow input,
+ KeyedProcessFunction<K, BaseRow, BaseRow>.Context ctx,
+ Collector<BaseRow> out) throws Exception {
+ // register state-cleanup timer
+ registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime());
+
+ BaseRow accumulators = accState.value();
+ if (null == accumulators) {
+ accumulators = function.createAccumulators();
+ }
+ // set accumulators in context first
+ function.setAccumulators(accumulators);
+
+ // accumulate input row
+ function.accumulate(input);
+
+ // update the value of accumulators for future incremental computation
+ accumulators = function.getAccumulators();
+ accState.update(accumulators);
+
+ // prepare output row
+ BaseRow aggValue = function.getValue();
+ output.replace(input, aggValue);
+ out.collect(output);
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ KeyedProcessFunction<K, BaseRow, BaseRow>.OnTimerContext ctx,
+ Collector<BaseRow> out) throws Exception {
+ if (needToCleanupState(timestamp)) {
+ cleanupState(accState);
+ function.cleanup();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (null != function) {
+ function.close();
+ }
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRangeBoundedPrecedingFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRangeBoundedPrecedingFunction.java
new file mode 100644
index 0000000..8133d2f
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRangeBoundedPrecedingFunction.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.over;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.dataview.PerKeyStateDataViewStore;
+import org.apache.flink.table.generated.AggsHandleFunction;
+import org.apache.flink.table.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Process Function for RANGE clause event-time bounded OVER window.
+ *
+ * <p>E.g.:
+ * SELECT rowtime, b, c,
+ * min(c) OVER
+ * (PARTITION BY b ORDER BY rowtime
+ * RANGE BETWEEN INTERVAL '4' SECOND PRECEDING AND CURRENT ROW),
+ * max(c) OVER
+ * (PARTITION BY b ORDER BY rowtime
+ * RANGE BETWEEN INTERVAL '4' SECOND PRECEDING AND CURRENT ROW)
+ * FROM T.
+ */
+public class RowTimeRangeBoundedPrecedingFunction<K> extends KeyedProcessFunctionWithCleanupState<K, BaseRow, BaseRow> {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(RowTimeRowsBoundedPrecedingFunction.class);
+
+ private final GeneratedAggsHandleFunction genAggsHandler;
+ private final InternalType[] accTypes;
+ private final InternalType[] inputFieldTypes;
+ private final long precedingOffset;
+ private final int rowTimeIdx;
+
+ private transient JoinedRow output;
+
+ // the state which keeps the last triggering timestamp
+ private transient ValueState<Long> lastTriggeringTsState;
+
+ // the state which used to materialize the accumulator for incremental calculation
+ private transient ValueState<BaseRow> accState;
+
+ // the state which keeps all the data that are not expired.
+ // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
+ // the second element of tuple is a list that contains the entire data of all the rows belonging
+ // to this time stamp.
+ private transient MapState<Long, List<BaseRow>> inputState;
+
+ private transient AggsHandleFunction function;
+
+ public RowTimeRangeBoundedPrecedingFunction(
+ long minRetentionTime,
+ long maxRetentionTime,
+ GeneratedAggsHandleFunction genAggsHandler,
+ InternalType[] accTypes,
+ InternalType[] inputFieldTypes,
+ long precedingOffset,
+ int rowTimeIdx) {
+ super(minRetentionTime, maxRetentionTime);
+ Preconditions.checkNotNull(precedingOffset);
+ this.genAggsHandler = genAggsHandler;
+ this.accTypes = accTypes;
+ this.inputFieldTypes = inputFieldTypes;
+ this.precedingOffset = precedingOffset;
+ this.rowTimeIdx = rowTimeIdx;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ function = genAggsHandler.newInstance(getRuntimeContext().getUserCodeClassLoader());
+ function.open(new PerKeyStateDataViewStore(getRuntimeContext()));
+
+ output = new JoinedRow();
+
+ ValueStateDescriptor<Long> lastTriggeringTsDescriptor = new ValueStateDescriptor<Long>(
+ "lastTriggeringTsState",
+ Types.LONG);
+ lastTriggeringTsState = getRuntimeContext().getState(lastTriggeringTsDescriptor);
+
+ BaseRowTypeInfo accTypeInfo = new BaseRowTypeInfo(accTypes);
+ ValueStateDescriptor<BaseRow> accStateDesc = new ValueStateDescriptor<BaseRow>("accState", accTypeInfo);
+ accState = getRuntimeContext().getState(accStateDesc);
+
+ // input element are all binary row as they are came from network
+ BaseRowTypeInfo inputType = new BaseRowTypeInfo(inputFieldTypes);
+ ListTypeInfo<BaseRow> rowListTypeInfo = new ListTypeInfo<BaseRow>(inputType);
+ MapStateDescriptor<Long, List<BaseRow>> inputStateDesc = new MapStateDescriptor<Long, List<BaseRow>>(
+ "inputState",
+ Types.LONG,
+ rowListTypeInfo);
+ inputState = getRuntimeContext().getMapState(inputStateDesc);
+
+ initCleanupTimeState("RowTimeBoundedRangeOverCleanupTime");
+ }
+
+ @Override
+ public void processElement(
+ BaseRow input,
+ KeyedProcessFunction<K, BaseRow, BaseRow>.Context ctx,
+ Collector<BaseRow> out) throws Exception {
+ // register state-cleanup timer
+ registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime());
+
+ // triggering timestamp for trigger calculation
+ long triggeringTs = input.getLong(rowTimeIdx);
+
+ Long lastTriggeringTs = lastTriggeringTsState.value();
+ if (lastTriggeringTs == null) {
+ lastTriggeringTs = 0L;
+ }
+
+ // check if the data is expired, if not, save the data and register event time timer
+ if (triggeringTs > lastTriggeringTs) {
+ List<BaseRow> data = inputState.get(triggeringTs);
+ if (null != data) {
+ data.add(input);
+ inputState.put(triggeringTs, data);
+ } else {
+ data = new ArrayList<BaseRow>();
+ data.add(input);
+ inputState.put(triggeringTs, data);
+ // register event time timer
+ ctx.timerService().registerEventTimeTimer(triggeringTs);
+ }
+ }
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ KeyedProcessFunction<K, BaseRow, BaseRow>.OnTimerContext ctx,
+ Collector<BaseRow> out) throws Exception {
+ // register state-cleanup timer
+ registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime());
+
+ if (isProcessingTimeTimer(ctx)) {
+ if (needToCleanupState(timestamp)) {
+
+ Iterator<Long> keysIt = inputState.keys().iterator();
+ Long lastProcessedTime = lastTriggeringTsState.value();
+ if (lastProcessedTime == null) {
+ lastProcessedTime = 0L;
+ }
+
+ // is data left which has not been processed yet?
+ boolean noRecordsToProcess = true;
+ while (keysIt.hasNext() && noRecordsToProcess) {
+ if (keysIt.next() > lastProcessedTime) {
+ noRecordsToProcess = false;
+ }
+ }
+
+ if (noRecordsToProcess) {
+ // we clean the state
+ cleanupState(inputState, accState, lastTriggeringTsState);
+ function.cleanup();
+ } else {
+ // There are records left to process because a watermark has not been received yet.
+ // This would only happen if the input stream has stopped. So we don't need to clean up.
+ // We leave the state as it is and schedule a new cleanup timer
+ registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime());
+ }
+ }
+ return;
+ }
+
+ // gets all window data from state for the calculation
+ List<BaseRow> inputs = inputState.get(timestamp);
+
+ if (null != inputs) {
+
+ int dataListIndex = 0;
+ BaseRow accumulators = accState.value();
+
+ // initialize when first run or failover recovery per key
+ if (null == accumulators) {
+ accumulators = function.createAccumulators();
+ }
+ // set accumulators in context first
+ function.setAccumulators(accumulators);
+
+ // keep up timestamps of retract data
+ List<Long> retractTsList = new ArrayList<Long>();
+
+ // do retraction
+ Iterator<Long> dataTimestampIt = inputState.keys().iterator();
+ while (dataTimestampIt.hasNext()) {
+ Long dataTs = dataTimestampIt.next();
+ Long offset = timestamp - dataTs;
+ if (offset > precedingOffset) {
+ List<BaseRow> retractDataList = inputState.get(dataTs);
+ if (retractDataList != null) {
+ dataListIndex = 0;
+ while (dataListIndex < retractDataList.size()) {
+ BaseRow retractRow = retractDataList.get(dataListIndex);
+ function.retract(retractRow);
+ dataListIndex += 1;
+ }
+ retractTsList.add(dataTs);
+ } else {
+ // Does not retract values which are outside of window if the state is cleared already.
+ LOG.warn("The state is cleared because of state ttl. " +
+ "This will result in incorrect result. " +
+ "You can increase the state ttl to avoid this.");
+ }
+ }
+ }
+
+ // do accumulation
+ dataListIndex = 0;
+ while (dataListIndex < inputs.size()) {
+ BaseRow curRow = inputs.get(dataListIndex);
+ // accumulate current row
+ function.accumulate(curRow);
+ dataListIndex += 1;
+ }
+
+ // get aggregate result
+ BaseRow aggValue = function.getValue();
+
+ // copy forwarded fields to output row and emit output row
+ dataListIndex = 0;
+ while (dataListIndex < inputs.size()) {
+ BaseRow curRow = inputs.get(dataListIndex);
+ output.replace(curRow, aggValue);
+ out.collect(output);
+ dataListIndex += 1;
+ }
+
+ // remove the data that has been retracted
+ dataListIndex = 0;
+ while (dataListIndex < retractTsList.size()) {
+ inputState.remove(retractTsList.get(dataListIndex));
+ dataListIndex += 1;
+ }
+
+ // update the value of accumulators for future incremental computation
+ accumulators = function.getAccumulators();
+ accState.update(accumulators);
+ }
+ lastTriggeringTsState.update(timestamp);
+
+ // update cleanup timer
+ registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime());
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (null != function) {
+ function.close();
+ }
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRangeUnboundedPrecedingFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRangeUnboundedPrecedingFunction.java
new file mode 100644
index 0000000..450dd56
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRangeUnboundedPrecedingFunction.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.over;
+
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+
+/**
+ * A ProcessFunction to support unbounded RANGE window.
+ * The RANGE option includes all the rows within the window frame
+ * that have the same ORDER BY values as the current row.
+ *
+ * <p>E.g.:
+ * SELECT rowtime, b, c,
+ * min(c) OVER
+ * (PARTITION BY b ORDER BY rowtime
+ * RANGE BETWEEN UNBOUNDED preceding AND CURRENT ROW),
+ * max(c) OVER
+ * (PARTITION BY b ORDER BY rowtime
+ * RANGE BETWEEN UNBOUNDED preceding AND CURRENT ROW)
+ * FROM T.
+ */
+public class RowTimeRangeUnboundedPrecedingFunction<K> extends AbstractRowTimeUnboundedPrecedingOver<K> {
+ private static final long serialVersionUID = 1L;
+
+ public RowTimeRangeUnboundedPrecedingFunction(
+ long minRetentionTime,
+ long maxRetentionTime,
+ GeneratedAggsHandleFunction genAggsHandler,
+ InternalType[] accTypes,
+ InternalType[] inputFieldTypes,
+ int rowTimeIdx) {
+ super(minRetentionTime, maxRetentionTime, genAggsHandler, accTypes, inputFieldTypes, rowTimeIdx);
+ }
+
+ @Override
+ public void processElementsWithSameTimestamp(
+ List<BaseRow> curRowList,
+ Collector<BaseRow> out) throws Exception {
+ int i = 0;
+ // all same timestamp data should have same aggregation value.
+ while (i < curRowList.size()) {
+ BaseRow curRow = curRowList.get(i);
+ function.accumulate(curRow);
+ i += 1;
+ }
+
+ // emit output row
+ i = 0;
+ BaseRow aggValue = function.getValue();
+ while (i < curRowList.size()) {
+ BaseRow curRow = curRowList.get(i);
+ // prepare output row
+ output.replace(curRow, aggValue);
+ out.collect(output);
+ i += 1;
+ }
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRowsBoundedPrecedingFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRowsBoundedPrecedingFunction.java
new file mode 100644
index 0000000..c0ccd38
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRowsBoundedPrecedingFunction.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.over;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.dataview.PerKeyStateDataViewStore;
+import org.apache.flink.table.generated.AggsHandleFunction;
+import org.apache.flink.table.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Process Function for ROWS clause event-time bounded OVER window.
+ *
+ * <p>E.g.:
+ * SELECT rowtime, b, c,
+ * min(c) OVER
+ * (PARTITION BY b ORDER BY rowtime
+ * ROWS BETWEEN 2 PRECEDING AND CURRENT ROW),
+ * max(c) OVER
+ * (PARTITION BY b ORDER BY rowtime
+ * ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
+ * FROM T.
+ */
+public class RowTimeRowsBoundedPrecedingFunction<K> extends KeyedProcessFunctionWithCleanupState<K, BaseRow, BaseRow> {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(RowTimeRowsBoundedPrecedingFunction.class);
+
+ private final GeneratedAggsHandleFunction genAggsHandler;
+ private final InternalType[] accTypes;
+ private final InternalType[] inputFieldTypes;
+ private final long precedingOffset;
+ private final int rowTimeIdx;
+
+ private transient JoinedRow output;
+
+ // the state which keeps the last triggering timestamp
+ private transient ValueState<Long> lastTriggeringTsState;
+
+ // the state which keeps the count of data
+ private transient ValueState<Long> counterState;
+
+ // the state which used to materialize the accumulator for incremental calculation
+ private transient ValueState<BaseRow> accState;
+
+ // the state which keeps all the data that are not expired.
+ // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
+ // the second element of tuple is a list that contains the entire data of all the rows belonging
+ // to this time stamp.
+ private transient MapState<Long, List<BaseRow>> inputState;
+
+ private transient AggsHandleFunction function;
+
+ public RowTimeRowsBoundedPrecedingFunction(
+ long minRetentionTime,
+ long maxRetentionTime,
+ GeneratedAggsHandleFunction genAggsHandler,
+ InternalType[] accTypes,
+ InternalType[] inputFieldTypes,
+ long precedingOffset,
+ int rowTimeIdx) {
+ super(minRetentionTime, maxRetentionTime);
+ Preconditions.checkNotNull(precedingOffset);
+ this.genAggsHandler = genAggsHandler;
+ this.accTypes = accTypes;
+ this.inputFieldTypes = inputFieldTypes;
+ this.precedingOffset = precedingOffset;
+ this.rowTimeIdx = rowTimeIdx;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ function = genAggsHandler.newInstance(getRuntimeContext().getUserCodeClassLoader());
+ function.open(new PerKeyStateDataViewStore(getRuntimeContext()));
+
+ output = new JoinedRow();
+
+ ValueStateDescriptor<Long> lastTriggeringTsDescriptor = new ValueStateDescriptor<Long>(
+ "lastTriggeringTsState",
+ Types.LONG);
+ lastTriggeringTsState = getRuntimeContext().getState(lastTriggeringTsDescriptor);
+
+ ValueStateDescriptor<Long> dataCountStateDescriptor = new ValueStateDescriptor<Long>(
+ "processedCountState",
+ Types.LONG);
+ counterState = getRuntimeContext().getState(dataCountStateDescriptor);
+
+ BaseRowTypeInfo accTypeInfo = new BaseRowTypeInfo(accTypes);
+ ValueStateDescriptor<BaseRow> accStateDesc = new ValueStateDescriptor<BaseRow>("accState", accTypeInfo);
+ accState = getRuntimeContext().getState(accStateDesc);
+
+ // input element are all binary row as they are came from network
+ BaseRowTypeInfo inputType = new BaseRowTypeInfo(inputFieldTypes);
+ ListTypeInfo<BaseRow> rowListTypeInfo = new ListTypeInfo<BaseRow>(inputType);
+ MapStateDescriptor<Long, List<BaseRow>> inputStateDesc = new MapStateDescriptor<Long, List<BaseRow>>(
+ "inputState",
+ Types.LONG,
+ rowListTypeInfo);
+ inputState = getRuntimeContext().getMapState(inputStateDesc);
+
+ initCleanupTimeState("RowTimeBoundedRowsOverCleanupTime");
+ }
+
+ @Override
+ public void processElement(
+ BaseRow input,
+ KeyedProcessFunction<K, BaseRow, BaseRow>.Context ctx,
+ Collector<BaseRow> out) throws Exception {
+ // register state-cleanup timer
+ registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime());
+
+ // triggering timestamp for trigger calculation
+ long triggeringTs = input.getLong(rowTimeIdx);
+
+ Long lastTriggeringTs = lastTriggeringTsState.value();
+ if (lastTriggeringTs == null) {
+ lastTriggeringTs = 0L;
+ }
+
+ // check if the data is expired, if not, save the data and register event time timer
+ if (triggeringTs > lastTriggeringTs) {
+ List<BaseRow> data = inputState.get(triggeringTs);
+ if (null != data) {
+ data.add(input);
+ inputState.put(triggeringTs, data);
+ } else {
+ data = new ArrayList<BaseRow>();
+ data.add(input);
+ inputState.put(triggeringTs, data);
+ // register event time timer
+ ctx.timerService().registerEventTimeTimer(triggeringTs);
+ }
+ }
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ KeyedProcessFunction<K, BaseRow, BaseRow>.OnTimerContext ctx,
+ Collector<BaseRow> out) throws Exception {
+ if (isProcessingTimeTimer(ctx)) {
+ if (needToCleanupState(timestamp)) {
+
+ Iterator<Long> keysIt = inputState.keys().iterator();
+ Long lastProcessedTime = lastTriggeringTsState.value();
+ if (lastProcessedTime == null) {
+ lastProcessedTime = 0L;
+ }
+
+ // is data left which has not been processed yet?
+ boolean noRecordsToProcess = true;
+ while (keysIt.hasNext() && noRecordsToProcess) {
+ if (keysIt.next() > lastProcessedTime) {
+ noRecordsToProcess = false;
+ }
+ }
+
+ if (noRecordsToProcess) {
+ // We clean the state
+ cleanupState(inputState, accState, counterState, lastTriggeringTsState);
+ function.cleanup();
+ } else {
+ // There are records left to process because a watermark has not been received yet.
+ // This would only happen if the input stream has stopped. So we don't need to clean up.
+ // We leave the state as it is and schedule a new cleanup timer
+ registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime());
+ }
+ }
+ return;
+ }
+
+ // gets all window data from state for the calculation
+ List<BaseRow> inputs = inputState.get(timestamp);
+
+ if (null != inputs) {
+
+ Long dataCount = counterState.value();
+ if (dataCount == null) {
+ dataCount = 0L;
+ }
+
+ BaseRow accumulators = accState.value();
+ if (accumulators == null) {
+ accumulators = function.createAccumulators();
+ }
+ // set accumulators in context first
+ function.setAccumulators(accumulators);
+
+ List<BaseRow> retractList = null;
+ long retractTs = Long.MAX_VALUE;
+ int retractCnt = 0;
+ int i = 0;
+
+ while (i < inputs.size()) {
+ BaseRow input = inputs.get(i);
+ BaseRow retractRow = null;
+ if (dataCount >= precedingOffset) {
+ if (null == retractList) {
+ // find the smallest timestamp
+ retractTs = Long.MAX_VALUE;
+ for (Long dataTs : inputState.keys()) {
+ if (dataTs < retractTs) {
+ retractTs = dataTs;
+ // get the oldest rows to retract them
+ retractList = inputState.get(dataTs);
+ }
+ }
+ }
+
+ if (retractList != null) {
+ retractRow = retractList.get(retractCnt);
+ retractCnt += 1;
+
+ // remove retracted values from state
+ if (retractList.size() == retractCnt) {
+ inputState.remove(retractTs);
+ retractList = null;
+ retractCnt = 0;
+ }
+ }
+ } else {
+ dataCount += 1;
+ }
+
+ // retract old row from accumulators
+ if (null != retractRow) {
+ function.retract(retractRow);
+ }
+
+ // accumulate current row
+ function.accumulate(input);
+
+ // prepare output row
+ output.replace(input, function.getValue());
+ out.collect(output);
+
+ i += 1;
+ }
+
+ // update all states
+ if (inputState.contains(retractTs)) {
+ if (retractCnt > 0) {
+ retractList.subList(0, retractCnt).clear();
+ inputState.put(retractTs, retractList);
+ }
+ }
+ counterState.update(dataCount);
+ // update the value of accumulators for future incremental computation
+ accumulators = function.getAccumulators();
+ accState.update(accumulators);
+ }
+
+ lastTriggeringTsState.update(timestamp);
+
+ // update cleanup timer
+ registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime());
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (null != function) {
+ function.close();
+ }
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRowsUnboundedPrecedingFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRowsUnboundedPrecedingFunction.java
new file mode 100644
index 0000000..1d0a26d
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRowsUnboundedPrecedingFunction.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.over;
+
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+
+/**
+ * A ProcessFunction to support unbounded ROWS window.
+ * The ROWS clause defines on a physical level how many rows are included in a window frame.
+ *
+ * <p>E.g.:
+ * SELECT rowtime, b, c,
+ * min(c) OVER
+ * (PARTITION BY b ORDER BY rowtime
+ * ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW),
+ * max(c) OVER
+ * (PARTITION BY b ORDER BY rowtime
+ * ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)
+ * FROM T.
+ */
+public class RowTimeRowsUnboundedPrecedingFunction<K> extends AbstractRowTimeUnboundedPrecedingOver<K> {
+ private static final long serialVersionUID = 1L;
+
+ public RowTimeRowsUnboundedPrecedingFunction(
+ long minRetentionTime,
+ long maxRetentionTime,
+ GeneratedAggsHandleFunction genAggsHandler,
+ InternalType[] accTypes,
+ InternalType[] inputFieldTypes,
+ int rowTimeIdx) {
+ super(minRetentionTime, maxRetentionTime, genAggsHandler, accTypes, inputFieldTypes, rowTimeIdx);
+ }
+
+ @Override
+ public void processElementsWithSameTimestamp(
+ List<BaseRow> curRowList,
+ Collector<BaseRow> out) throws Exception {
+ int i = 0;
+ while (i < curRowList.size()) {
+ BaseRow curRow = curRowList.get(i);
+ // accumulate current row
+ function.accumulate(curRow);
+ // prepare output row
+ output.replace(curRow, function.getValue());
+ // emit output row
+ out.collect(output);
+ i += 1;
+ }
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java
index 3397817..273ab5f 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java
@@ -161,6 +161,10 @@ public class TypeConverters {
ObjectArrayTypeInfo arrayType = (ObjectArrayTypeInfo) typeInfo;
return InternalTypes.createArrayType(
createInternalTypeFromTypeInfo(arrayType.getComponentInfo()));
+ } else if (typeInfo instanceof MultisetTypeInfo) {
+ MultisetTypeInfo multisetType = (MultisetTypeInfo) typeInfo;
+ return InternalTypes.createMultisetType(
+ createInternalTypeFromTypeInfo(multisetType.getElementTypeInfo()));
} else if (typeInfo instanceof MapTypeInfo) {
MapTypeInfo mapType = (MapTypeInfo) typeInfo;
return InternalTypes.createMapType(
@@ -236,20 +240,19 @@ public class TypeConverters {
} else if (type instanceof ArrayType) {
return ObjectArrayTypeInfo.getInfoFor(
createExternalTypeInfoFromInternalType(((ArrayType) type).getElementType()));
+ } else if (type instanceof MultisetType) {
+ MultisetType multisetType = (MultisetType) type;
+ return MultisetTypeInfo.getInfoFor(
+ createExternalTypeInfoFromInternalType(multisetType.getElementType()));
} else if (type instanceof MapType) {
MapType mapType = (MapType) type;
return new MapTypeInfo(
createExternalTypeInfoFromInternalType(mapType.getKeyType()),
createExternalTypeInfoFromInternalType(mapType.getValueType()));
- } else if (type instanceof MultisetType) {
- MultisetType multisetType = (MultisetType) type;
- return MultisetTypeInfo.getInfoFor(
- createExternalTypeInfoFromInternalType(multisetType.getElementType()));
- }
- else if (type instanceof DecimalType) {
+ } else if (type instanceof DecimalType) {
DecimalType decimalType = (DecimalType) type;
return new BigDecimalTypeInfo(decimalType.precision(), decimalType.scale());
- } else if (type instanceof GenericType) {
+ } else if (type instanceof GenericType) {
GenericType genericType = (GenericType) type;
return genericType.getTypeInfo();
} else {
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/BaseRowHarnessAssertor.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/BaseRowHarnessAssertor.java
index 392e062..c95d547 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/BaseRowHarnessAssertor.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/BaseRowHarnessAssertor.java
@@ -47,15 +47,15 @@ public class BaseRowHarnessAssertor {
private final TypeInformation[] typeInfos;
private final Comparator<GenericRow> comparator;
- public BaseRowHarnessAssertor(TypeInformation[] typeInfos) {
- this(typeInfos, null);
- }
-
public BaseRowHarnessAssertor(TypeInformation[] typeInfos, Comparator<GenericRow> comparator) {
this.typeInfos = typeInfos;
this.comparator = comparator;
}
+ public BaseRowHarnessAssertor(TypeInformation[] typeInfos) {
+ this(typeInfos, new StringComparator());
+ }
+
/**
* Compare the two queues containing operator/task output by converting them to an array first.
@@ -73,9 +73,9 @@ public class BaseRowHarnessAssertor {
* comparator. Assertes two sorted converted array should be same.
*/
public void assertOutputEqualsSorted(
- String message,
- Collection<Object> expected,
- Collection<Object> actual) {
+ String message,
+ Collection<Object> expected,
+ Collection<Object> actual) {
assertOutputEquals(message, expected, actual, true);
}
@@ -133,4 +133,10 @@ public class BaseRowHarnessAssertor {
Assert.assertArrayEquals(message, sortedExpected, sortedActual);
}
+ private static class StringComparator implements Comparator<GenericRow> {
+ @Override
+ public int compare(GenericRow o1, GenericRow o2) {
+ return o1.toString().compareTo(o2.toString());
+ }
+ }
}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java
index 91eff9e..bd80dfc 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java
@@ -20,8 +20,16 @@ package org.apache.flink.table.runtime.util;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.dataformat.BinaryRowWriter;
+import org.apache.flink.table.dataformat.BinaryString;
+import org.apache.flink.table.dataformat.BinaryWriter;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.dataformat.util.BaseRowUtil;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.type.InternalTypes;
+
+import java.util.Date;
import static org.apache.flink.table.dataformat.BinaryString.fromString;
@@ -83,6 +91,43 @@ public class StreamRecordUtils {
return GenericRow.of(objects);
}
+ /**
+ * Receives a object array, generates a BinaryRow based on the array.
+ *
+ * @param fields input object array
+ * @return generated BinaryRow.
+ */
+ public static BinaryRow binaryrow(Object... fields) {
+ BinaryRow row = new BinaryRow(fields.length);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+ for (int j = 0; j < fields.length; j++) {
+ Object value = fields[j];
+ if (value == null) {
+ writer.setNullAt(j);
+ } else if (value instanceof Integer) {
+ writer.writeInt(j, (Integer) value);
+ } else if (value instanceof String) {
+ writer.writeString(j, BinaryString.fromString((String) value));
+ } else if (value instanceof Double) {
+ writer.writeDouble(j, (Double) value);
+ } else if (value instanceof Float) {
+ writer.writeFloat(j, (Float) value);
+ } else if (value instanceof Long) {
+ writer.writeLong(j, (Long) value);
+ } else if (value instanceof Boolean) {
+ writer.writeBoolean(j, (Boolean) value);
+ } else if (value instanceof Date) {
+ InternalType internalType = InternalTypes.DATE;
+ BinaryWriter.write(writer, j, value, internalType);
+ } else {
+ writer.setNullAt(j);
+ }
+ }
+
+ writer.complete();
+ return row;
+ }
+
private StreamRecordUtils() {
// deprecate default constructor
}