You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/30 07:53:41 UTC
flink git commit: [FLINK-5655] [table] Add event-time OVER RANGE
BETWEEN x PRECEDING aggregation to SQL.
Repository: flink
Updated Branches:
refs/heads/master ca681101f -> d4665a00a
[FLINK-5655] [table] Add event-time OVER RANGE BETWEEN x PRECEDING aggregation to SQL.
This closes #3629.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d4665a00
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d4665a00
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d4665a00
Branch: refs/heads/master
Commit: d4665a00a4262f89b166895f73a54daab2f25e1c
Parents: ca68110
Author: \u91d1\u7af9 <ji...@alibaba-inc.com>
Authored: Tue Mar 28 12:36:03 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Mar 30 09:25:17 2017 +0200
----------------------------------------------------------------------
.../datastream/DataStreamOverAggregate.scala | 17 +-
.../table/runtime/aggregate/AggregateUtil.scala | 34 ++-
.../RangeClauseBoundedOverProcessFunction.scala | 221 +++++++++++++++++++
.../table/api/scala/stream/sql/SqlITCase.scala | 144 ++++++++++++
.../scala/stream/sql/WindowAggregateTest.scala | 55 +++++
5 files changed, 455 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d4665a00/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index 01e5a9a..7b744f1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -139,11 +139,16 @@ class DataStreamOverAggregate(
// bounded OVER window
if (overWindow.isRows) {
// ROWS clause bounded OVER window
- createRowsClauseBoundedAndCurrentRowOverWindow(inputDS, isRowTimeType = true)
+ createBoundedAndCurrentRowOverWindow(
+ inputDS,
+ isRangeClause = false,
+ isRowTimeType = true)
} else {
// RANGE clause bounded OVER window
- throw new TableException(
- "row-time OVER RANGE PRECEDING window is not supported yet.")
+ createBoundedAndCurrentRowOverWindow(
+ inputDS,
+ isRangeClause = true,
+ isRowTimeType = true)
}
} else {
throw new TableException(
@@ -195,8 +200,9 @@ class DataStreamOverAggregate(
result
}
- def createRowsClauseBoundedAndCurrentRowOverWindow(
+ def createBoundedAndCurrentRowOverWindow(
inputDS: DataStream[Row],
+ isRangeClause: Boolean = false,
isRowTimeType: Boolean = false): DataStream[Row] = {
val overWindow: Group = logicWindow.groups.get(0)
@@ -209,10 +215,11 @@ class DataStreamOverAggregate(
// get the output types
val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
- val processFunction = AggregateUtil.createRowsClauseBoundedOverProcessFunction(
+ val processFunction = AggregateUtil.createBoundedOverProcessFunction(
namedAggregates,
inputType,
precedingOffset,
+ isRangeClause,
isRowTimeType
)
val result: DataStream[Row] =
http://git-wip-us.apache.org/repos/asf/flink/blob/d4665a00/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index fdac692..cbb2e53 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -91,20 +91,21 @@ object AggregateUtil {
}
/**
- * Create an [[org.apache.flink.streaming.api.functions.ProcessFunction]] for ROWS clause
+ * Create an [[org.apache.flink.streaming.api.functions.ProcessFunction]] for
* bounded OVER window to evaluate final aggregate value.
*
* @param namedAggregates List of calls to aggregate functions and their output field names
* @param inputType Input row type
- * @param inputFields All input fields
* @param precedingOffset the preceding offset
+ * @param isRangeClause It is a tag that indicates whether the OVER clause is rangeClause
* @param isRowTimeType It is a tag that indicates whether the time type is rowTimeType
* @return [[org.apache.flink.streaming.api.functions.ProcessFunction]]
*/
- private[flink] def createRowsClauseBoundedOverProcessFunction(
+ private[flink] def createBoundedOverProcessFunction(
namedAggregates: Seq[CalcitePair[AggregateCall, String]],
inputType: RelDataType,
precedingOffset: Long,
+ isRangeClause: Boolean,
isRowTimeType: Boolean): ProcessFunction[Row, Row] = {
val (aggFields, aggregates) =
@@ -117,14 +118,25 @@ object AggregateUtil {
val inputRowType = FlinkTypeFactory.toInternalRowTypeInfo(inputType).asInstanceOf[RowTypeInfo]
if (isRowTimeType) {
- new RowsClauseBoundedOverProcessFunction(
- aggregates,
- aggFields,
- inputType.getFieldCount,
- aggregationStateType,
- inputRowType,
- precedingOffset
- )
+ if (isRangeClause) {
+ new RangeClauseBoundedOverProcessFunction(
+ aggregates,
+ aggFields,
+ inputType.getFieldCount,
+ aggregationStateType,
+ inputRowType,
+ precedingOffset
+ )
+ } else {
+ new RowsClauseBoundedOverProcessFunction(
+ aggregates,
+ aggFields,
+ inputType.getFieldCount,
+ aggregationStateType,
+ inputRowType,
+ precedingOffset
+ )
+ }
} else {
throw TableException(
"Bounded partitioned proc-time OVER aggregation is not supported yet.")
http://git-wip-us.apache.org/repos/asf/flink/blob/d4665a00/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala
new file mode 100644
index 0000000..0c8555b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util.{List => JList, ArrayList => JArrayList}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+ * Process Function for RANGE clause event-time bounded OVER window
+ *
+ * @param aggregates the list of all [[AggregateFunction]] used for this aggregation
+ * @param aggFields the position (in the input Row) of the input value for each aggregate
+ * @param forwardedFieldCount the count of forwarded fields.
+ * @param aggregationStateType the row type info of aggregation
+ * @param inputRowType the row type info of input row
+ * @param precedingOffset the preceding offset
+ */
+class RangeClauseBoundedOverProcessFunction(
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Int],
+ private val forwardedFieldCount: Int,
+ private val aggregationStateType: RowTypeInfo,
+ private val inputRowType: RowTypeInfo,
+ private val precedingOffset: Long)
+ extends ProcessFunction[Row, Row] {
+
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+ Preconditions.checkNotNull(forwardedFieldCount)
+ Preconditions.checkNotNull(aggregationStateType)
+ Preconditions.checkNotNull(precedingOffset)
+
+ private var output: Row = _
+
+ // the state which keeps the last triggering timestamp
+ private var lastTriggeringTsState: ValueState[Long] = _
+
+ // the state which used to materialize the accumulator for incremental calculation
+ private var accumulatorState: ValueState[Row] = _
+
+ // the state which keeps all the data that are not expired.
+ // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
+ // the second element of tuple is a list that contains the entire data of all the rows belonging
+ // to this time stamp.
+ private var dataState: MapState[Long, JList[Row]] = _
+
+ override def open(config: Configuration) {
+
+ output = new Row(forwardedFieldCount + aggregates.length)
+
+ val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long])
+ lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor)
+
+ val accumulatorStateDescriptor =
+ new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType)
+ accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor)
+
+ val keyTypeInformation: TypeInformation[Long] =
+ BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
+ val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputRowType)
+
+ val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+ new MapStateDescriptor[Long, JList[Row]](
+ "dataState",
+ keyTypeInformation,
+ valueTypeInformation)
+
+ dataState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+ }
+
+ override def processElement(
+ input: Row,
+ ctx: ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+
+ // triggering timestamp for trigger calculation
+ val triggeringTs = ctx.timestamp
+
+ val lastTriggeringTs = lastTriggeringTsState.value
+
+ // check if the data is expired, if not, save the data and register event time timer
+ if (triggeringTs > lastTriggeringTs) {
+ val data = dataState.get(triggeringTs)
+ if (null != data) {
+ data.add(input)
+ dataState.put(triggeringTs, data)
+ } else {
+ val data = new JArrayList[Row]
+ data.add(input)
+ dataState.put(triggeringTs, data)
+ // register event time timer
+ ctx.timerService.registerEventTimeTimer(triggeringTs)
+ }
+ }
+ }
+
+ override def onTimer(
+ timestamp: Long,
+ ctx: ProcessFunction[Row, Row]#OnTimerContext,
+ out: Collector[Row]): Unit = {
+ // gets all window data from state for the calculation
+ val inputs: JList[Row] = dataState.get(timestamp)
+
+ if (null != inputs) {
+
+ var accumulators = accumulatorState.value
+ var dataListIndex = 0
+ var aggregatesIndex = 0
+
+ // initialize when first run or failover recovery per key
+ if (null == accumulators) {
+ accumulators = new Row(aggregates.length)
+ aggregatesIndex = 0
+ while (aggregatesIndex < aggregates.length) {
+ accumulators.setField(aggregatesIndex, aggregates(aggregatesIndex).createAccumulator())
+ aggregatesIndex += 1
+ }
+ }
+
+ // keep up timestamps of retract data
+ val retractTsList: JList[Long] = new JArrayList[Long]
+
+ // do retraction
+ val dataTimestampIt = dataState.keys.iterator
+ while (dataTimestampIt.hasNext) {
+ val dataTs: Long = dataTimestampIt.next()
+ val offset = timestamp - dataTs
+ if (offset > precedingOffset) {
+ val retractDataList = dataState.get(dataTs)
+ dataListIndex = 0
+ while (dataListIndex < retractDataList.size()) {
+ aggregatesIndex = 0
+ while (aggregatesIndex < aggregates.length) {
+ val accumulator = accumulators.getField(aggregatesIndex).asInstanceOf[Accumulator]
+ aggregates(aggregatesIndex)
+ .retract(accumulator, retractDataList.get(dataListIndex)
+ .getField(aggFields(aggregatesIndex)))
+ aggregatesIndex += 1
+ }
+ dataListIndex += 1
+ }
+ retractTsList.add(dataTs)
+ }
+ }
+
+ // do accumulation
+ dataListIndex = 0
+ while (dataListIndex < inputs.size()) {
+ // accumulate current row
+ aggregatesIndex = 0
+ while (aggregatesIndex < aggregates.length) {
+ val accumulator = accumulators.getField(aggregatesIndex).asInstanceOf[Accumulator]
+ aggregates(aggregatesIndex).accumulate(accumulator, inputs.get(dataListIndex)
+ .getField(aggFields(aggregatesIndex)))
+ aggregatesIndex += 1
+ }
+ dataListIndex += 1
+ }
+
+ // set aggregate in output row
+ aggregatesIndex = 0
+ while (aggregatesIndex < aggregates.length) {
+ val index = forwardedFieldCount + aggregatesIndex
+ val accumulator = accumulators.getField(aggregatesIndex).asInstanceOf[Accumulator]
+ output.setField(index, aggregates(aggregatesIndex).getValue(accumulator))
+ aggregatesIndex += 1
+ }
+
+ // copy forwarded fields to output row and emit output row
+ dataListIndex = 0
+ while (dataListIndex < inputs.size()) {
+ aggregatesIndex = 0
+ while (aggregatesIndex < forwardedFieldCount) {
+ output.setField(aggregatesIndex, inputs.get(dataListIndex).getField(aggregatesIndex))
+ aggregatesIndex += 1
+ }
+ out.collect(output)
+ dataListIndex += 1
+ }
+
+ // remove the data that has been retracted
+ dataListIndex = 0
+ while (dataListIndex < retractTsList.size) {
+ dataState.remove(retractTsList.get(dataListIndex))
+ dataListIndex += 1
+ }
+
+ // update state
+ accumulatorState.update(accumulators)
+ lastTriggeringTsState.update(timestamp)
+ }
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/flink/blob/d4665a00/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index 80ff42a..b8285a1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -411,6 +411,150 @@ class SqlITCase extends StreamingWithStateTestBase {
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
+ @Test
+ def testBoundPartitionedEventTimeWindowWithRange(): Unit = {
+ val data = Seq(
+ Left((1500L, (1L, 15, "Hello"))),
+ Left((1600L, (1L, 16, "Hello"))),
+ Left((1000L, (1L, 1, "Hello"))),
+ Left((2000L, (2L, 2, "Hello"))),
+ Right(1000L),
+ Left((2000L, (2L, 2, "Hello"))),
+ Left((2000L, (2L, 3, "Hello"))),
+ Left((3000L, (3L, 3, "Hello"))),
+ Right(2000L),
+ Left((4000L, (4L, 4, "Hello"))),
+ Right(3000L),
+ Left((5000L, (5L, 5, "Hello"))),
+ Right(5000L),
+ Left((6000L, (6L, 6, "Hello"))),
+ Left((6500L, (6L, 65, "Hello"))),
+ Right(7000L),
+ Left((9000L, (6L, 9, "Hello"))),
+ Left((9500L, (6L, 18, "Hello"))),
+ Left((9000L, (6L, 9, "Hello"))),
+ Right(10000L),
+ Left((10000L, (7L, 7, "Hello World"))),
+ Left((11000L, (7L, 17, "Hello World"))),
+ Left((11000L, (7L, 77, "Hello World"))),
+ Right(12000L),
+ Left((14000L, (7L, 18, "Hello World"))),
+ Right(14000L),
+ Left((15000L, (8L, 8, "Hello World"))),
+ Right(17000L),
+ Left((20000L, (20L, 20, "Hello World"))),
+ Right(19000L))
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t1 = env
+ .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
+ .toTable(tEnv).as('a, 'b, 'c)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT " +
+ "c, b, " +
+ "count(a) OVER (PARTITION BY c ORDER BY RowTime() RANGE BETWEEN INTERVAL '1' SECOND " +
+ "preceding AND CURRENT ROW)" +
+ ", sum(a) OVER (PARTITION BY c ORDER BY RowTime() RANGE BETWEEN INTERVAL '1' SECOND " +
+ " preceding AND CURRENT ROW)" +
+ " from T1"
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
+ "Hello,2,6,9", "Hello,3,6,9","Hello,2,6,9",
+ "Hello,3,4,9",
+ "Hello,4,2,7",
+ "Hello,5,2,9",
+ "Hello,6,2,11","Hello,65,2,12",
+ "Hello,9,2,12","Hello,9,2,12","Hello,18,3,18",
+ "Hello World,7,1,7", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
+ "Hello World,8,2,15",
+ "Hello World,20,1,20")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testBoundNonPartitionedEventTimeWindowWithRange(): Unit = {
+ val data = Seq(
+ Left((1500L, (1L, 15, "Hello"))),
+ Left((1600L, (1L, 16, "Hello"))),
+ Left((1000L, (1L, 1, "Hello"))),
+ Left((2000L, (2L, 2, "Hello"))),
+ Right(1000L),
+ Left((2000L, (2L, 2, "Hello"))),
+ Left((2000L, (2L, 3, "Hello"))),
+ Left((3000L, (3L, 3, "Hello"))),
+ Right(2000L),
+ Left((4000L, (4L, 4, "Hello"))),
+ Right(3000L),
+ Left((5000L, (5L, 5, "Hello"))),
+ Right(5000L),
+ Left((6000L, (6L, 6, "Hello"))),
+ Left((6500L, (6L, 65, "Hello"))),
+ Right(7000L),
+ Left((9000L, (6L, 9, "Hello"))),
+ Left((9500L, (6L, 18, "Hello"))),
+ Left((9000L, (6L, 9, "Hello"))),
+ Right(10000L),
+ Left((10000L, (7L, 7, "Hello World"))),
+ Left((11000L, (7L, 17, "Hello World"))),
+ Left((11000L, (7L, 77, "Hello World"))),
+ Right(12000L),
+ Left((14000L, (7L, 18, "Hello World"))),
+ Right(14000L),
+ Left((15000L, (8L, 8, "Hello World"))),
+ Right(17000L),
+ Left((20000L, (20L, 20, "Hello World"))),
+ Right(19000L))
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t1 = env
+ .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
+ .toTable(tEnv).as('a, 'b, 'c)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT " +
+ "c, b, " +
+ "count(a) OVER (ORDER BY RowTime() RANGE BETWEEN INTERVAL '1' SECOND " +
+ "preceding AND CURRENT ROW)" +
+ ", sum(a) OVER (ORDER BY RowTime() RANGE BETWEEN INTERVAL '1' SECOND " +
+ " preceding AND CURRENT ROW)" +
+ " from T1"
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
+ "Hello,2,6,9", "Hello,3,6,9","Hello,2,6,9",
+ "Hello,3,4,9",
+ "Hello,4,2,7",
+ "Hello,5,2,9",
+ "Hello,6,2,11","Hello,65,2,12",
+ "Hello,9,2,12","Hello,9,2,12","Hello,18,3,18",
+ "Hello World,7,4,25", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
+ "Hello World,8,2,15",
+ "Hello World,20,1,20")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
/**
* All aggregates must be computed on the same window.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/d4665a00/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index 7b8b2df..45d204a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -350,4 +350,59 @@ class WindowAggregateTest extends TableTestBase {
streamUtil.verifySql(sql, expected)
}
+ @Test
+ def testBoundPartitionedRowTimeWindowWithRange() = {
+ val sql = "SELECT " +
+ "c, " +
+ "count(a) OVER (PARTITION BY c ORDER BY RowTime() " +
+ "RANGE BETWEEN INTERVAL '1' SECOND preceding AND CURRENT ROW) as cnt1 " +
+ "from MyTable"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c", "ROWTIME() AS $2")
+ ),
+ term("partitionBy", "c"),
+ term("orderBy", "ROWTIME"),
+ term("range", "BETWEEN 1000 PRECEDING AND CURRENT ROW"),
+ term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0")
+ ),
+ term("select", "c", "w0$o0 AS $1")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
+ @Test
+ def testBoundNonPartitionedRowTimeWindowWithRange() = {
+ val sql = "SELECT " +
+ "c, " +
+ "count(a) OVER (ORDER BY RowTime() " +
+ "RANGE BETWEEN INTERVAL '1' SECOND preceding AND CURRENT ROW) as cnt1 " +
+ "from MyTable"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c", "ROWTIME() AS $2")
+ ),
+ term("orderBy", "ROWTIME"),
+ term("range", "BETWEEN 1000 PRECEDING AND CURRENT ROW"),
+ term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0")
+ ),
+ term("select", "c", "w0$o0 AS $1")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
}