You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/30 22:04:31 UTC
[20/50] [abbrv] flink git commit: [FLINK-5658] [table] Add event-time
OVER ROWS/RANGE UNBOUNDED PRECEDING aggregation to SQL.
[FLINK-5658] [table] Add event-time OVER ROWS/RANGE UNBOUNDED PRECEDING aggregation to SQL.
This closes #3386.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe2c61a2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe2c61a2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe2c61a2
Branch: refs/heads/table-retraction
Commit: fe2c61a28e6a5300b2cf4c1e50ee884b51ef42c9
Parents: 7a9d39f
Author: hongyuhong 00223286 <ho...@huawei.com>
Authored: Fri Mar 24 09:31:59 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Mar 24 20:19:17 2017 +0100
----------------------------------------------------------------------
.../datastream/DataStreamOverAggregate.scala | 62 +++--
.../table/runtime/aggregate/AggregateUtil.scala | 70 +++--
.../UnboundedEventTimeOverProcessFunction.scala | 224 ++++++++++++++++
.../table/api/scala/stream/sql/SqlITCase.scala | 263 ++++++++++++++++++-
.../scala/stream/sql/WindowAggregateTest.scala | 64 ++++-
5 files changed, 634 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fe2c61a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index 547c875..3dd7ee2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -104,7 +104,7 @@ class DataStreamOverAggregate(
case _: ProcTimeType =>
// proc-time OVER window
if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
- // non-bounded OVER window
+ // unbounded preceding OVER window
createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS)
} else if (
overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded &&
@@ -126,23 +126,15 @@ class DataStreamOverAggregate(
}
case _: RowTimeType =>
// row-time OVER window
- if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
- // non-bounded OVER window
- if (overWindow.isRows) {
- // ROWS clause unbounded OVER window
- throw new TableException(
- "ROWS clause unbounded row-time OVER window no supported yet.")
- } else {
- // RANGE clause unbounded OVER window
- throw new TableException(
- "RANGE clause unbounded row-time OVER window no supported yet.")
- }
- } else if (overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded &&
- overWindow.upperBound.isCurrentRow) {
+ if (overWindow.lowerBound.isPreceding &&
+ overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
+ // unbounded preceding OVER window
+ createUnboundedAndCurrentRowEventTimeOverWindow(inputDS)
+ } else if (overWindow.lowerBound.isPreceding && overWindow.upperBound.isCurrentRow) {
// bounded OVER window
if (overWindow.isRows) {
// ROWS clause bounded OVER window
- createRowsClauseBoundedAndCurrentRowOverWindow(inputDS, true)
+ createRowsClauseBoundedAndCurrentRowOverWindow(inputDS, isRowTimeType = true)
} else {
// RANGE clause bounded OVER window
throw new TableException(
@@ -187,7 +179,7 @@ class DataStreamOverAggregate(
val processFunction = AggregateUtil.createUnboundedProcessingOverProcessFunction(
namedAggregates,
inputType,
- false)
+ isPartitioned = false)
inputDS
.process(processFunction).setParallelism(1).setMaxParallelism(1)
@@ -205,7 +197,6 @@ class DataStreamOverAggregate(
val overWindow: Group = logicWindow.groups.get(0)
val partitionKeys: Array[Int] = overWindow.keys.toArray
val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates
- val inputFields = (0 until inputType.getFieldCount).toArray
val precedingOffset =
getLowerBoundary(logicWindow, overWindow, getInput()) + 1
@@ -216,7 +207,6 @@ class DataStreamOverAggregate(
val processFunction = AggregateUtil.createRowsClauseBoundedOverProcessFunction(
namedAggregates,
inputType,
- inputFields,
precedingOffset,
isRowTimeType
)
@@ -244,6 +234,42 @@ class DataStreamOverAggregate(
result
}
+ def createUnboundedAndCurrentRowEventTimeOverWindow(
+ inputDS: DataStream[Row]): DataStream[Row] = {
+
+ val overWindow: Group = logicWindow.groups.get(0)
+ val partitionKeys: Array[Int] = overWindow.keys.toArray
+ val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates
+
+ // get the output types
+ val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+
+ val processFunction = AggregateUtil.createUnboundedEventTimeOverProcessFunction(
+ namedAggregates,
+ inputType)
+
+ val result: DataStream[Row] =
+ // partitioned aggregation
+ if (partitionKeys.nonEmpty) {
+ inputDS.keyBy(partitionKeys: _*)
+ .process(processFunction)
+ .returns(rowTypeInfo)
+ .name(aggOpName)
+ .asInstanceOf[DataStream[Row]]
+ }
+ // global non-partitioned aggregation
+ else {
+ inputDS.keyBy(new NullByteKeySelector[Row])
+ .process(processFunction)
+ .setParallelism(1)
+ .setMaxParallelism(1)
+ .returns(rowTypeInfo)
+ .name(aggOpName)
+ .asInstanceOf[DataStream[Row]]
+ }
+ result
+ }
+
private def generateNamedAggregates: Seq[CalcitePair[AggregateCall, String]] = {
val overWindow: Group = logicWindow.groups.get(0)
http://git-wip-us.apache.org/repos/asf/flink/blob/fe2c61a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 0084ee5..fdac692 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -104,7 +104,6 @@ object AggregateUtil {
private[flink] def createRowsClauseBoundedOverProcessFunction(
namedAggregates: Seq[CalcitePair[AggregateCall, String]],
inputType: RelDataType,
- inputFields: Array[Int],
precedingOffset: Long,
isRowTimeType: Boolean): ProcessFunction[Row, Row] = {
@@ -114,26 +113,49 @@ object AggregateUtil {
inputType,
needRetraction = true)
- val aggregationStateType: RowTypeInfo =
- createDataSetAggregateBufferDataType(Array(), aggregates, inputType)
+ val aggregationStateType: RowTypeInfo = createAccumulatorRowType(aggregates)
+ val inputRowType = FlinkTypeFactory.toInternalRowTypeInfo(inputType).asInstanceOf[RowTypeInfo]
- val inputRowType: RowTypeInfo =
- createDataSetAggregateBufferDataType(inputFields, Array(), inputType)
+ if (isRowTimeType) {
+ new RowsClauseBoundedOverProcessFunction(
+ aggregates,
+ aggFields,
+ inputType.getFieldCount,
+ aggregationStateType,
+ inputRowType,
+ precedingOffset
+ )
+ } else {
+ throw TableException(
+ "Bounded partitioned proc-time OVER aggregation is not supported yet.")
+ }
+ }
- val processFunction = if (isRowTimeType) {
- new RowsClauseBoundedOverProcessFunction(
- aggregates,
- aggFields,
- inputType.getFieldCount,
- aggregationStateType,
- inputRowType,
- precedingOffset
- )
- } else {
- throw TableException(
- "Bounded partitioned proc-time OVER aggregation is not supported yet.")
- }
- processFunction
+ /**
+ * Create an [[ProcessFunction]] to evaluate final aggregate value.
+ *
+ * @param namedAggregates List of calls to aggregate functions and their output field names
+ * @param inputType Input row type
+ * @return [[UnboundedEventTimeOverProcessFunction]]
+ */
+ private[flink] def createUnboundedEventTimeOverProcessFunction(
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType): UnboundedEventTimeOverProcessFunction = {
+
+ val (aggFields, aggregates) =
+ transformToAggregateFunctions(
+ namedAggregates.map(_.getKey),
+ inputType,
+ needRetraction = false)
+
+ val aggregationStateType: RowTypeInfo = createAccumulatorRowType(aggregates)
+
+ new UnboundedEventTimeOverProcessFunction(
+ aggregates,
+ aggFields,
+ inputType.getFieldCount,
+ aggregationStateType,
+ FlinkTypeFactory.toInternalRowTypeInfo(inputType))
}
/**
@@ -595,7 +617,7 @@ object AggregateUtil {
// compute preaggregation type
val preAggFieldTypes = gkeyInFields
.map(inputType.getFieldList.get(_).getType)
- .map(FlinkTypeFactory.toTypeInfo) ++ createAccumulatorType(inputType, aggregates)
+ .map(FlinkTypeFactory.toTypeInfo) ++ createAccumulatorType(aggregates)
val preAggRowType = new RowTypeInfo(preAggFieldTypes: _*)
(
@@ -701,7 +723,7 @@ object AggregateUtil {
val aggResultTypes = namedAggregates.map(a => FlinkTypeFactory.toTypeInfo(a.left.getType))
- val accumulatorRowType = createAccumulatorRowType(inputType, aggregates)
+ val accumulatorRowType = createAccumulatorRowType(aggregates)
val aggResultRowType = new RowTypeInfo(aggResultTypes: _*)
val aggFunction = new AggregateAggFunction(aggregates, aggFields)
@@ -1029,7 +1051,6 @@ object AggregateUtil {
}
private def createAccumulatorType(
- inputType: RelDataType,
aggregates: Array[TableAggregateFunction[_]]): Seq[TypeInformation[_]] = {
val aggTypes: Seq[TypeInformation[_]] =
@@ -1068,7 +1089,7 @@ object AggregateUtil {
.map(FlinkTypeFactory.toTypeInfo)
// get all field data types of all intermediate aggregates
- val aggTypes: Seq[TypeInformation[_]] = createAccumulatorType(inputType, aggregates)
+ val aggTypes: Seq[TypeInformation[_]] = createAccumulatorType(aggregates)
// concat group key types, aggregation types, and window key types
val allFieldTypes: Seq[TypeInformation[_]] = windowKeyTypes match {
@@ -1079,10 +1100,9 @@ object AggregateUtil {
}
private def createAccumulatorRowType(
- inputType: RelDataType,
aggregates: Array[TableAggregateFunction[_]]): RowTypeInfo = {
- val aggTypes: Seq[TypeInformation[_]] = createAccumulatorType(inputType, aggregates)
+ val aggTypes: Seq[TypeInformation[_]] = createAccumulatorType(aggregates)
new RowTypeInfo(aggTypes: _*)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fe2c61a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
new file mode 100644
index 0000000..7616ede
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+
+/**
+ * A ProcessFunction to support unbounded event-time over-window
+ *
+ * @param aggregates the aggregate functions
+ * @param aggFields the filed index which the aggregate functions use
+ * @param forwardedFieldCount the input fields count
+ * @param intermediateType the intermediate row tye which the state saved
+ * @param inputType the input row tye which the state saved
+ *
+ */
+class UnboundedEventTimeOverProcessFunction(
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Int],
+ private val forwardedFieldCount: Int,
+ private val intermediateType: TypeInformation[Row],
+ private val inputType: TypeInformation[Row])
+ extends ProcessFunction[Row, Row]{
+
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+ private var output: Row = _
+ // state to hold the accumulators of the aggregations
+ private var accumulatorState: ValueState[Row] = _
+ // state to hold rows until the next watermark arrives
+ private var rowMapState: MapState[Long, JList[Row]] = _
+ // list to sort timestamps to access rows in timestamp order
+ private var sortedTimestamps: util.LinkedList[Long] = _
+
+
+ override def open(config: Configuration) {
+ output = new Row(forwardedFieldCount + aggregates.length)
+ sortedTimestamps = new util.LinkedList[Long]()
+
+ // initialize accumulator state
+ val accDescriptor: ValueStateDescriptor[Row] =
+ new ValueStateDescriptor[Row]("accumulatorstate", intermediateType)
+ accumulatorState = getRuntimeContext.getState[Row](accDescriptor)
+
+ // initialize row state
+ val rowListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputType)
+ val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+ new MapStateDescriptor[Long, JList[Row]]("rowmapstate",
+ BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
+ rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+ }
+
+ /**
+ * Puts an element from the input stream into state if it is not late.
+ * Registers a timer for the next watermark.
+ *
+ * @param input The input value.
+ * @param ctx The ctx to register timer or get current time
+ * @param out The collector for returning result values.
+ *
+ */
+ override def processElement(
+ input: Row,
+ ctx: ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+
+ val timestamp = ctx.timestamp()
+ val curWatermark = ctx.timerService().currentWatermark()
+
+ // discard late record
+ if (timestamp >= curWatermark) {
+ // ensure every key just registers one timer
+ ctx.timerService.registerEventTimeTimer(curWatermark + 1)
+
+ // put row into state
+ var rowList = rowMapState.get(timestamp)
+ if (rowList == null) {
+ rowList = new util.ArrayList[Row]()
+ }
+ rowList.add(input)
+ rowMapState.put(timestamp, rowList)
+ }
+ }
+
+ /**
+ * Called when a watermark arrived.
+ * Sorts records according the timestamp, computes aggregates, and emits all records with
+ * timestamp smaller than the watermark in timestamp order.
+ *
+ * @param timestamp The timestamp of the firing timer.
+ * @param ctx The ctx to register timer or get current time
+ * @param out The collector for returning result values.
+ */
+ override def onTimer(
+ timestamp: Long,
+ ctx: ProcessFunction[Row, Row]#OnTimerContext,
+ out: Collector[Row]): Unit = {
+
+ Preconditions.checkArgument(out.isInstanceOf[TimestampedCollector[Row]])
+ val collector = out.asInstanceOf[TimestampedCollector[Row]]
+
+ val keyIterator = rowMapState.keys.iterator
+ if (keyIterator.hasNext) {
+ val curWatermark = ctx.timerService.currentWatermark
+ var existEarlyRecord: Boolean = false
+ var i = 0
+
+ // sort the record timestamps
+ do {
+ val recordTime = keyIterator.next
+ // only take timestamps smaller/equal to the watermark
+ if (recordTime <= curWatermark) {
+ insertToSortedList(recordTime)
+ } else {
+ existEarlyRecord = true
+ }
+ } while (keyIterator.hasNext)
+
+ // get last accumulator
+ var lastAccumulator = accumulatorState.value
+ if (lastAccumulator == null) {
+ // initialize accumulator
+ lastAccumulator = new Row(aggregates.length)
+ while (i < aggregates.length) {
+ lastAccumulator.setField(i, aggregates(i).createAccumulator())
+ i += 1
+ }
+ }
+
+ // emit the rows in order
+ while (!sortedTimestamps.isEmpty) {
+ val curTimestamp = sortedTimestamps.removeFirst()
+ val curRowList = rowMapState.get(curTimestamp)
+ collector.setAbsoluteTimestamp(curTimestamp)
+
+ var j = 0
+ while (j < curRowList.size) {
+ val curRow = curRowList.get(j)
+ i = 0
+
+ // copy forwarded fields to output row
+ while (i < forwardedFieldCount) {
+ output.setField(i, curRow.getField(i))
+ i += 1
+ }
+
+ // update accumulators and copy aggregates to output row
+ i = 0
+ while (i < aggregates.length) {
+ val index = forwardedFieldCount + i
+ val accumulator = lastAccumulator.getField(i).asInstanceOf[Accumulator]
+ aggregates(i).accumulate(accumulator, curRow.getField(aggFields(i)))
+ output.setField(index, aggregates(i).getValue(accumulator))
+ i += 1
+ }
+ // emit output row
+ collector.collect(output)
+ j += 1
+ }
+ rowMapState.remove(curTimestamp)
+ }
+
+ accumulatorState.update(lastAccumulator)
+
+ // if are are rows with timestamp > watermark, register a timer for the next watermark
+ if (existEarlyRecord) {
+ ctx.timerService.registerEventTimeTimer(curWatermark + 1)
+ }
+ }
+ }
+
+ /**
+ * Inserts timestamps in order into a linked list.
+ *
+ * If timestamps arrive in order (as in case of using the RocksDB state backend) this is just
+ * an append with O(1).
+ */
+ private def insertToSortedList(recordTimeStamp: Long) = {
+ val listIterator = sortedTimestamps.listIterator(sortedTimestamps.size)
+ var continue = true
+ while (listIterator.hasPrevious && continue) {
+ val timestamp = listIterator.previous
+ if (recordTimeStamp >= timestamp) {
+ listIterator.next
+ listIterator.add(recordTimeStamp)
+ continue = false
+ }
+ }
+
+ if (continue) {
+ sortedTimestamps.addFirst(recordTimeStamp)
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fe2c61a2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index 19350a7..34a68b2 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -19,8 +19,8 @@
package org.apache.flink.table.api.scala.stream.sql
import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.table.api.scala.stream.sql.SqlITCase.EventTimeSourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.table.api.{TableEnvironment, TableException}
@@ -436,6 +436,266 @@ class SqlITCase extends StreamingWithStateTestBase {
env.execute()
}
+ /** test sliding event-time unbounded window with partition by **/
+ @Test
+ def testUnboundedEventTimeRowWindowWithPartition(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ StreamITCase.testResults = mutable.MutableList()
+ env.setParallelism(1)
+
+ val sqlQuery = "SELECT a, b, c, " +
+ "SUM(b) over (" +
+ "partition by a order by rowtime() range between unbounded preceding and current row), " +
+ "count(b) over (" +
+ "partition by a order by rowtime() range between unbounded preceding and current row), " +
+ "avg(b) over (" +
+ "partition by a order by rowtime() range between unbounded preceding and current row), " +
+ "max(b) over (" +
+ "partition by a order by rowtime() range between unbounded preceding and current row), " +
+ "min(b) over (" +
+ "partition by a order by rowtime() range between unbounded preceding and current row) " +
+ "from T1"
+
+ val data = Seq(
+ Left(14000005L, (1, 1L, "Hi")),
+ Left(14000000L, (2, 1L, "Hello")),
+ Left(14000002L, (3, 1L, "Hello")),
+ Left(14000003L, (1, 2L, "Hello")),
+ Left(14000004L, (1, 3L, "Hello world")),
+ Left(14000007L, (3, 2L, "Hello world")),
+ Left(14000008L, (2, 2L, "Hello world")),
+ Right(14000010L),
+ // the next 3 elements are late
+ Left(14000008L, (1, 4L, "Hello world")),
+ Left(14000008L, (2, 3L, "Hello world")),
+ Left(14000008L, (3, 3L, "Hello world")),
+ Left(14000012L, (1, 5L, "Hello world")),
+ Right(14000020L),
+ Left(14000021L, (1, 6L, "Hello world")),
+ // the next 3 elements are late
+ Left(14000019L, (1, 6L, "Hello world")),
+ Left(14000018L, (2, 4L, "Hello world")),
+ Left(14000018L, (3, 4L, "Hello world")),
+ Left(14000022L, (2, 5L, "Hello world")),
+ Left(14000022L, (3, 5L, "Hello world")),
+ Left(14000024L, (1, 7L, "Hello world")),
+ Left(14000023L, (1, 8L, "Hello world")),
+ Left(14000021L, (1, 9L, "Hello world")),
+ Right(14000030L)
+ )
+
+ val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+ .toTable(tEnv).as('a, 'b, 'c)
+
+ tEnv.registerTable("T1", t1)
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,2,Hello,2,1,2,2,2",
+ "1,3,Hello world,5,2,2,3,2",
+ "1,1,Hi,6,3,2,3,1",
+ "2,1,Hello,1,1,1,1,1",
+ "2,2,Hello world,3,2,1,2,1",
+ "3,1,Hello,1,1,1,1,1",
+ "3,2,Hello world,3,2,1,2,1",
+ "1,5,Hello world,11,4,2,5,1",
+ "1,6,Hello world,17,5,3,6,1",
+ "1,9,Hello world,26,6,4,9,1",
+ "1,8,Hello world,34,7,4,9,1",
+ "1,7,Hello world,41,8,5,9,1",
+ "2,5,Hello world,8,3,2,5,1",
+ "3,5,Hello world,8,3,2,5,1"
+ )
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ /** test sliding event-time unbounded window with partition by **/
+ @Test
+ def testUnboundedEventTimeRowWindowWithPartitionMultiThread(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val sqlQuery = "SELECT a, b, c, " +
+ "SUM(b) over (" +
+ "partition by a order by rowtime() range between unbounded preceding and current row), " +
+ "count(b) over (" +
+ "partition by a order by rowtime() range between unbounded preceding and current row), " +
+ "avg(b) over (" +
+ "partition by a order by rowtime() range between unbounded preceding and current row), " +
+ "max(b) over (" +
+ "partition by a order by rowtime() range between unbounded preceding and current row), " +
+ "min(b) over (" +
+ "partition by a order by rowtime() range between unbounded preceding and current row) " +
+ "from T1"
+
+ val data = Seq(
+ Left(14000005L, (1, 1L, "Hi")),
+ Left(14000000L, (2, 1L, "Hello")),
+ Left(14000002L, (3, 1L, "Hello")),
+ Left(14000003L, (1, 2L, "Hello")),
+ Left(14000004L, (1, 3L, "Hello world")),
+ Left(14000007L, (3, 2L, "Hello world")),
+ Left(14000008L, (2, 2L, "Hello world")),
+ Right(14000010L),
+ Left(14000012L, (1, 5L, "Hello world")),
+ Right(14000020L),
+ Left(14000021L, (1, 6L, "Hello world")),
+ Left(14000023L, (2, 5L, "Hello world")),
+ Left(14000024L, (3, 5L, "Hello world")),
+ Left(14000026L, (1, 7L, "Hello world")),
+ Left(14000025L, (1, 8L, "Hello world")),
+ Left(14000022L, (1, 9L, "Hello world")),
+ Right(14000030L)
+ )
+
+ val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+ .toTable(tEnv).as('a, 'b, 'c)
+
+ tEnv.registerTable("T1", t1)
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,2,Hello,2,1,2,2,2",
+ "1,3,Hello world,5,2,2,3,2",
+ "1,1,Hi,6,3,2,3,1",
+ "2,1,Hello,1,1,1,1,1",
+ "2,2,Hello world,3,2,1,2,1",
+ "3,1,Hello,1,1,1,1,1",
+ "3,2,Hello world,3,2,1,2,1",
+ "1,5,Hello world,11,4,2,5,1",
+ "1,6,Hello world,17,5,3,6,1",
+ "1,9,Hello world,26,6,4,9,1",
+ "1,8,Hello world,34,7,4,9,1",
+ "1,7,Hello world,41,8,5,9,1",
+ "2,5,Hello world,8,3,2,5,1",
+ "3,5,Hello world,8,3,2,5,1"
+ )
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ /** test sliding event-time unbounded window without partitiion by **/
+ @Test
+ def testUnboundedEventTimeRowWindowWithoutPartition(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ StreamITCase.testResults = mutable.MutableList()
+ env.setParallelism(1)
+
+ val sqlQuery = "SELECT a, b, c, " +
+ "SUM(b) over (order by rowtime() range between unbounded preceding and current row), " +
+ "count(b) over (order by rowtime() range between unbounded preceding and current row), " +
+ "avg(b) over (order by rowtime() range between unbounded preceding and current row), " +
+ "max(b) over (order by rowtime() range between unbounded preceding and current row), " +
+ "min(b) over (order by rowtime() range between unbounded preceding and current row) " +
+ "from T1"
+
+ val data = Seq(
+ Left(14000005L, (1, 1L, "Hi")),
+ Left(14000000L, (2, 2L, "Hello")),
+ Left(14000002L, (3, 5L, "Hello")),
+ Left(14000003L, (1, 3L, "Hello")),
+ Left(14000004L, (3, 7L, "Hello world")),
+ Left(14000007L, (4, 9L, "Hello world")),
+ Left(14000008L, (5, 8L, "Hello world")),
+ Right(14000010L),
+ // this element will be discard because it is late
+ Left(14000008L, (6, 8L, "Hello world")),
+ Right(14000020L),
+ Left(14000021L, (6, 8L, "Hello world")),
+ Right(14000030L)
+ )
+
+ val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+ .toTable(tEnv).as('a, 'b, 'c)
+
+ tEnv.registerTable("T1", t1)
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "2,2,Hello,2,1,2,2,2",
+ "3,5,Hello,7,2,3,5,2",
+ "1,3,Hello,10,3,3,5,2",
+ "3,7,Hello world,17,4,4,7,2",
+ "1,1,Hi,18,5,3,7,1",
+ "4,9,Hello world,27,6,4,9,1",
+ "5,8,Hello world,35,7,5,9,1",
+ "6,8,Hello world,43,8,5,9,1")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ /** test sliding event-time unbounded window without partitiion by and arrive early **/
+ @Test
+ def testUnboundedEventTimeRowWindowArriveEarly(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ StreamITCase.testResults = mutable.MutableList()
+ env.setParallelism(1)
+
+ val sqlQuery = "SELECT a, b, c, " +
+ "SUM(b) over (order by rowtime() range between unbounded preceding and current row), " +
+ "count(b) over (order by rowtime() range between unbounded preceding and current row), " +
+ "avg(b) over (order by rowtime() range between unbounded preceding and current row), " +
+ "max(b) over (order by rowtime() range between unbounded preceding and current row), " +
+ "min(b) over (order by rowtime() range between unbounded preceding and current row) " +
+ "from T1"
+
+ val data = Seq(
+ Left(14000005L, (1, 1L, "Hi")),
+ Left(14000000L, (2, 2L, "Hello")),
+ Left(14000002L, (3, 5L, "Hello")),
+ Left(14000003L, (1, 3L, "Hello")),
+ // next three elements are early
+ Left(14000012L, (3, 7L, "Hello world")),
+ Left(14000013L, (4, 9L, "Hello world")),
+ Left(14000014L, (5, 8L, "Hello world")),
+ Right(14000010L),
+ Left(14000011L, (6, 8L, "Hello world")),
+ // next element is early
+ Left(14000021L, (6, 8L, "Hello world")),
+ Right(14000020L),
+ Right(14000030L)
+ )
+
+ val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+ .toTable(tEnv).as('a, 'b, 'c)
+
+ tEnv.registerTable("T1", t1)
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "2,2,Hello,2,1,2,2,2",
+ "3,5,Hello,7,2,3,5,2",
+ "1,3,Hello,10,3,3,5,2",
+ "1,1,Hi,11,4,2,5,1",
+ "6,8,Hello world,19,5,3,8,1",
+ "3,7,Hello world,26,6,4,8,1",
+ "4,9,Hello world,35,7,5,9,1",
+ "5,8,Hello world,43,8,5,9,1",
+ "6,8,Hello world,51,9,5,9,1")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
}
object SqlITCase {
@@ -451,5 +711,4 @@ object SqlITCase {
override def cancel(): Unit = ???
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fe2c61a2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index 9a425b3..7b8b2df 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -241,12 +241,67 @@ class WindowAggregateTest extends TableTestBase {
}
@Test
+ def testUnboundNonPartitionedEventTimeWindowWithRange() = {
+ val sql = "SELECT " +
+ "c, " +
+ "count(a) OVER (ORDER BY RowTime() RANGE UNBOUNDED preceding) as cnt1, " +
+ "sum(a) OVER (ORDER BY RowTime() RANGE UNBOUNDED preceding) as cnt2 " +
+ "from MyTable"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c", "ROWTIME() AS $2")
+ ),
+ term("orderBy", "ROWTIME"),
+ term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+ term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
+ ),
+ term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
+ @Test
+ def testUnboundPartitionedEventTimeWindowWithRange() = {
+ val sql = "SELECT " +
+ "c, " +
+ "count(a) OVER (PARTITION BY c ORDER BY RowTime() RANGE UNBOUNDED preceding) as cnt1, " +
+ "sum(a) OVER (PARTITION BY c ORDER BY RowTime() RANGE UNBOUNDED preceding) as cnt2 " +
+ "from MyTable"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c", "ROWTIME() AS $2")
+ ),
+ term("partitionBy", "c"),
+ term("orderBy", "ROWTIME"),
+ term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+ term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
+ ),
+ term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
+ @Test
def testBoundPartitionedRowTimeWindowWithRow() = {
val sql = "SELECT " +
- "c, " +
- "count(a) OVER (PARTITION BY c ORDER BY RowTime() ROWS BETWEEN 5 preceding AND " +
- "CURRENT ROW) as cnt1 " +
- "from MyTable"
+ "c, " +
+ "count(a) OVER (PARTITION BY c ORDER BY RowTime() ROWS BETWEEN 5 preceding AND " +
+ "CURRENT ROW) as cnt1 " +
+ "from MyTable"
val expected =
unaryNode(
@@ -294,4 +349,5 @@ class WindowAggregateTest extends TableTestBase {
)
streamUtil.verifySql(sql, expected)
}
+
}