You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/30 22:05:00 UTC
[49/50] [abbrv] flink git commit: [FLINK-6200] [table] Add support
for unbounded event-time OVER RANGE window.
[FLINK-6200] [table] Add support for unbounded event-time OVER RANGE window.
This closes #3649.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44f9c76a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44f9c76a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44f9c76a
Branch: refs/heads/table-retraction
Commit: 44f9c76a9ff50e95947c9f78a86b485f564e3796
Parents: aa3c395
Author: hongyuhong 00223286 <ho...@huawei.com>
Authored: Wed Mar 29 10:29:17 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Mar 30 22:12:21 2017 +0200
----------------------------------------------------------------------
.../datastream/DataStreamOverAggregate.scala | 20 +--
.../table/runtime/aggregate/AggregateUtil.scala | 26 ++-
.../UnboundedEventTimeOverProcessFunction.scala | 163 +++++++++++++++----
.../table/api/scala/stream/sql/SqlITCase.scala | 132 +++++++++++++++
4 files changed, 292 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/44f9c76a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index 7b744f1..e24dd23 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -127,14 +127,8 @@ class DataStreamOverAggregate(
// row-time OVER window
if (overWindow.lowerBound.isPreceding &&
overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
- if (overWindow.isRows) {
- // unbounded preceding OVER ROWS window
- createUnboundedAndCurrentRowEventTimeOverWindow(inputDS)
- } else {
- // unbounded preceding OVER RANGE window
- throw new TableException(
- "row-time OVER RANGE UNBOUNDED PRECEDING window is not supported yet.")
- }
+ // ROWS/RANGE clause unbounded OVER window
+ createUnboundedAndCurrentRowEventTimeOverWindow(inputDS, overWindow.isRows)
} else if (overWindow.lowerBound.isPreceding && overWindow.upperBound.isCurrentRow) {
// bounded OVER window
if (overWindow.isRows) {
@@ -202,8 +196,8 @@ class DataStreamOverAggregate(
def createBoundedAndCurrentRowOverWindow(
inputDS: DataStream[Row],
- isRangeClause: Boolean = false,
- isRowTimeType: Boolean = false): DataStream[Row] = {
+ isRangeClause: Boolean,
+ isRowTimeType: Boolean): DataStream[Row] = {
val overWindow: Group = logicWindow.groups.get(0)
val partitionKeys: Array[Int] = overWindow.keys.toArray
@@ -247,7 +241,8 @@ class DataStreamOverAggregate(
}
def createUnboundedAndCurrentRowEventTimeOverWindow(
- inputDS: DataStream[Row]): DataStream[Row] = {
+ inputDS: DataStream[Row],
+ isRows: Boolean): DataStream[Row] = {
val overWindow: Group = logicWindow.groups.get(0)
val partitionKeys: Array[Int] = overWindow.keys.toArray
@@ -258,7 +253,8 @@ class DataStreamOverAggregate(
val processFunction = AggregateUtil.createUnboundedEventTimeOverProcessFunction(
namedAggregates,
- inputType)
+ inputType,
+ isRows)
val result: DataStream[Row] =
// partitioned aggregation
http://git-wip-us.apache.org/repos/asf/flink/blob/44f9c76a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index cbb2e53..93ab7b7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -152,7 +152,8 @@ object AggregateUtil {
*/
private[flink] def createUnboundedEventTimeOverProcessFunction(
namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- inputType: RelDataType): UnboundedEventTimeOverProcessFunction = {
+ inputType: RelDataType,
+ isRows: Boolean): UnboundedEventTimeOverProcessFunction = {
val (aggFields, aggregates) =
transformToAggregateFunctions(
@@ -162,12 +163,23 @@ object AggregateUtil {
val aggregationStateType: RowTypeInfo = createAccumulatorRowType(aggregates)
- new UnboundedEventTimeOverProcessFunction(
- aggregates,
- aggFields,
- inputType.getFieldCount,
- aggregationStateType,
- FlinkTypeFactory.toInternalRowTypeInfo(inputType))
+ if (isRows) {
+ // ROWS unbounded over process function
+ new UnboundedEventTimeRowsOverProcessFunction(
+ aggregates,
+ aggFields,
+ inputType.getFieldCount,
+ aggregationStateType,
+ FlinkTypeFactory.toInternalRowTypeInfo(inputType))
+ } else {
+ // RANGE unbounded over process function
+ new UnboundedEventTimeRangeOverProcessFunction(
+ aggregates,
+ aggFields,
+ inputType.getFieldCount,
+ aggregationStateType,
+ FlinkTypeFactory.toInternalRowTypeInfo(inputType))
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/44f9c76a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
index 7616ede..92faf7d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
@@ -41,7 +41,7 @@ import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
* @param inputType the input row tye which the state saved
*
*/
-class UnboundedEventTimeOverProcessFunction(
+abstract class UnboundedEventTimeOverProcessFunction(
private val aggregates: Array[AggregateFunction[_]],
private val aggFields: Array[Int],
private val forwardedFieldCount: Int,
@@ -53,7 +53,7 @@ class UnboundedEventTimeOverProcessFunction(
Preconditions.checkNotNull(aggFields)
Preconditions.checkArgument(aggregates.length == aggFields.length)
- private var output: Row = _
+ protected var output: Row = _
// state to hold the accumulators of the aggregations
private var accumulatorState: ValueState[Row] = _
// state to hold rows until the next watermark arrives
@@ -162,30 +162,9 @@ class UnboundedEventTimeOverProcessFunction(
val curRowList = rowMapState.get(curTimestamp)
collector.setAbsoluteTimestamp(curTimestamp)
- var j = 0
- while (j < curRowList.size) {
- val curRow = curRowList.get(j)
- i = 0
-
- // copy forwarded fields to output row
- while (i < forwardedFieldCount) {
- output.setField(i, curRow.getField(i))
- i += 1
- }
-
- // update accumulators and copy aggregates to output row
- i = 0
- while (i < aggregates.length) {
- val index = forwardedFieldCount + i
- val accumulator = lastAccumulator.getField(i).asInstanceOf[Accumulator]
- aggregates(i).accumulate(accumulator, curRow.getField(aggFields(i)))
- output.setField(index, aggregates(i).getValue(accumulator))
- i += 1
- }
- // emit output row
- collector.collect(output)
- j += 1
- }
+ // process the same timestamp datas, the mechanism is different according ROWS or RANGE
+ processElementsWithSameTimestamp(curRowList, lastAccumulator, collector)
+
rowMapState.remove(curTimestamp)
}
@@ -204,21 +183,145 @@ class UnboundedEventTimeOverProcessFunction(
* If timestamps arrive in order (as in case of using the RocksDB state backend) this is just
* an append with O(1).
*/
- private def insertToSortedList(recordTimeStamp: Long) = {
+ private def insertToSortedList(recordTimestamp: Long) = {
val listIterator = sortedTimestamps.listIterator(sortedTimestamps.size)
var continue = true
while (listIterator.hasPrevious && continue) {
val timestamp = listIterator.previous
- if (recordTimeStamp >= timestamp) {
+ if (recordTimestamp >= timestamp) {
listIterator.next
- listIterator.add(recordTimeStamp)
+ listIterator.add(recordTimestamp)
continue = false
}
}
if (continue) {
- sortedTimestamps.addFirst(recordTimeStamp)
+ sortedTimestamps.addFirst(recordTimestamp)
}
}
+ /**
+ * Process the same timestamp datas, the mechanism is different between
+ * rows and range window.
+ */
+ def processElementsWithSameTimestamp(
+ curRowList: JList[Row],
+ lastAccumulator: Row,
+ out: Collector[Row]): Unit
+
+}
+
+/**
+ * A ProcessFunction to support unbounded ROWS window.
+ * The ROWS clause defines on a physical level how many rows are included in a window frame.
+ */
+class UnboundedEventTimeRowsOverProcessFunction(
+ aggregates: Array[AggregateFunction[_]],
+ aggFields: Array[Int],
+ forwardedFieldCount: Int,
+ intermediateType: TypeInformation[Row],
+ inputType: TypeInformation[Row])
+ extends UnboundedEventTimeOverProcessFunction(
+ aggregates,
+ aggFields,
+ forwardedFieldCount,
+ intermediateType,
+ inputType) {
+
+ override def processElementsWithSameTimestamp(
+ curRowList: JList[Row],
+ lastAccumulator: Row,
+ out: Collector[Row]): Unit = {
+
+ var j = 0
+ var i = 0
+ while (j < curRowList.size) {
+ val curRow = curRowList.get(j)
+ i = 0
+
+ // copy forwarded fields to output row
+ while (i < forwardedFieldCount) {
+ output.setField(i, curRow.getField(i))
+ i += 1
+ }
+
+ // update accumulators and copy aggregates to output row
+ i = 0
+ while (i < aggregates.length) {
+ val index = forwardedFieldCount + i
+ val accumulator = lastAccumulator.getField(i).asInstanceOf[Accumulator]
+ aggregates(i).accumulate(accumulator, curRow.getField(aggFields(i)))
+ output.setField(index, aggregates(i).getValue(accumulator))
+ i += 1
+ }
+ // emit output row
+ out.collect(output)
+ j += 1
+ }
+ }
+}
+
+
+/**
+ * A ProcessFunction to support unbounded RANGE window.
+ * The RANGE option includes all the rows within the window frame
+ * that have the same ORDER BY values as the current row.
+ */
+class UnboundedEventTimeRangeOverProcessFunction(
+ aggregates: Array[AggregateFunction[_]],
+ aggFields: Array[Int],
+ forwardedFieldCount: Int,
+ intermediateType: TypeInformation[Row],
+ inputType: TypeInformation[Row])
+ extends UnboundedEventTimeOverProcessFunction(
+ aggregates,
+ aggFields,
+ forwardedFieldCount,
+ intermediateType,
+ inputType) {
+
+ override def processElementsWithSameTimestamp(
+ curRowList: JList[Row],
+ lastAccumulator: Row,
+ out: Collector[Row]): Unit = {
+
+ var j = 0
+ var i = 0
+ // all same timestamp data should have same aggregation value.
+ while (j < curRowList.size) {
+ val curRow = curRowList.get(j)
+ i = 0
+ while (i < aggregates.length) {
+ val index = forwardedFieldCount + i
+ val accumulator = lastAccumulator.getField(i).asInstanceOf[Accumulator]
+ aggregates(i).accumulate(accumulator, curRow.getField(aggFields(i)))
+ i += 1
+ }
+ j += 1
+ }
+
+ // emit output row
+ j = 0
+ while (j < curRowList.size) {
+ val curRow = curRowList.get(j)
+
+ // copy forwarded fields to output row
+ i = 0
+ while (i < forwardedFieldCount) {
+ output.setField(i, curRow.getField(i))
+ i += 1
+ }
+
+ //copy aggregates to output row
+ i = 0
+ while (i < aggregates.length) {
+ val index = forwardedFieldCount + i
+ val accumulator = lastAccumulator.getField(i).asInstanceOf[Accumulator]
+ output.setField(index, aggregates(i).getValue(accumulator))
+ i += 1
+ }
+ out.collect(output)
+ j += 1
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/44f9c76a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index b8285a1..0d3a46c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -840,6 +840,138 @@ class SqlITCase extends StreamingWithStateTestBase {
"6,8,Hello world,51,9,5,9,1")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
+
+ /** test sliding event-time non-partitioned unbounded RANGE window **/
+ @Test
+ def testUnboundedNonPartitionedEventTimeRangeWindow(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ StreamITCase.testResults = mutable.MutableList()
+ env.setParallelism(1)
+
+ val sqlQuery = "SELECT a, b, c, " +
+ "SUM(b) over (order by rowtime() range between unbounded preceding and current row), " +
+ "count(b) over (order by rowtime() range between unbounded preceding and current row), " +
+ "avg(b) over (order by rowtime() range between unbounded preceding and current row), " +
+ "max(b) over (order by rowtime() range between unbounded preceding and current row), " +
+ "min(b) over (order by rowtime() range between unbounded preceding and current row) " +
+ "from T1"
+
+ val data = Seq(
+ Left(14000005L, (1, 1L, "Hi")),
+ Left(14000000L, (2, 1L, "Hello")),
+ Left(14000002L, (1, 1L, "Hello")),
+ Left(14000002L, (1, 2L, "Hello")),
+ Left(14000002L, (1, 3L, "Hello world")),
+ Left(14000003L, (2, 2L, "Hello world")),
+ Left(14000003L, (2, 3L, "Hello world")),
+ Right(14000020L),
+ Left(14000021L, (1, 4L, "Hello world")),
+ Left(14000022L, (1, 5L, "Hello world")),
+ Left(14000022L, (1, 6L, "Hello world")),
+ Left(14000022L, (1, 7L, "Hello world")),
+ Left(14000023L, (2, 4L, "Hello world")),
+ Left(14000023L, (2, 5L, "Hello world")),
+ Right(14000030L)
+ )
+
+ val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+ .toTable(tEnv).as('a, 'b, 'c)
+
+ tEnv.registerTable("T1", t1)
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "2,1,Hello,1,1,1,1,1",
+ "1,1,Hello,7,4,1,3,1",
+ "1,2,Hello,7,4,1,3,1",
+ "1,3,Hello world,7,4,1,3,1",
+ "2,2,Hello world,12,6,2,3,1",
+ "2,3,Hello world,12,6,2,3,1",
+ "1,1,Hi,13,7,1,3,1",
+ "1,4,Hello world,17,8,2,4,1",
+ "1,5,Hello world,35,11,3,7,1",
+ "1,6,Hello world,35,11,3,7,1",
+ "1,7,Hello world,35,11,3,7,1",
+ "2,4,Hello world,44,13,3,7,1",
+ "2,5,Hello world,44,13,3,7,1"
+ )
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ /** test sliding event-time unbounded RANGE window **/
+ @Test
+ def testUnboundedPartitionedEventTimeRangeWindow(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ StreamITCase.testResults = mutable.MutableList()
+ env.setParallelism(1)
+
+ val sqlQuery = "SELECT a, b, c, " +
+ "SUM(b) over (" +
+ "partition by a order by rowtime() range between unbounded preceding and current row), " +
+ "count(b) over (" +
+ "partition by a order by rowtime() range between unbounded preceding and current row), " +
+ "avg(b) over (" +
+ "partition by a order by rowtime() range between unbounded preceding and current row), " +
+ "max(b) over (" +
+ "partition by a order by rowtime() range between unbounded preceding and current row), " +
+ "min(b) over (" +
+ "partition by a order by rowtime() range between unbounded preceding and current row) " +
+ "from T1"
+
+ val data = Seq(
+ Left(14000005L, (1, 1L, "Hi")),
+ Left(14000000L, (2, 1L, "Hello")),
+ Left(14000002L, (1, 1L, "Hello")),
+ Left(14000002L, (1, 2L, "Hello")),
+ Left(14000002L, (1, 3L, "Hello world")),
+ Left(14000003L, (2, 2L, "Hello world")),
+ Left(14000003L, (2, 3L, "Hello world")),
+ Right(14000020L),
+ Left(14000021L, (1, 4L, "Hello world")),
+ Left(14000022L, (1, 5L, "Hello world")),
+ Left(14000022L, (1, 6L, "Hello world")),
+ Left(14000022L, (1, 7L, "Hello world")),
+ Left(14000023L, (2, 4L, "Hello world")),
+ Left(14000023L, (2, 5L, "Hello world")),
+ Right(14000030L)
+ )
+
+ val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+ .toTable(tEnv).as('a, 'b, 'c)
+
+ tEnv.registerTable("T1", t1)
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,1,Hello,6,3,2,3,1",
+ "1,2,Hello,6,3,2,3,1",
+ "1,3,Hello world,6,3,2,3,1",
+ "1,1,Hi,7,4,1,3,1",
+ "2,1,Hello,1,1,1,1,1",
+ "2,2,Hello world,6,3,2,3,1",
+ "2,3,Hello world,6,3,2,3,1",
+ "1,4,Hello world,11,5,2,4,1",
+ "1,5,Hello world,29,8,3,7,1",
+ "1,6,Hello world,29,8,3,7,1",
+ "1,7,Hello world,29,8,3,7,1",
+ "2,4,Hello world,15,5,3,5,1",
+ "2,5,Hello world,15,5,3,5,1"
+ )
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
}
object SqlITCase {