You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/03 15:53:19 UTC
flink git commit: [FLINK-5219] [table] Add non-grouped session
windows for batch tables.
Repository: flink
Updated Branches:
refs/heads/master c31f95cab -> 728c936dd
[FLINK-5219] [table] Add non-grouped session windows for batch tables.
This closes #3266.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/728c936d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/728c936d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/728c936d
Branch: refs/heads/master
Commit: 728c936dd2ac18701e1d8696da251aec351b2ae6
Parents: c31f95c
Author: \u91d1\u7af9 <ji...@alibaba-inc.com>
Authored: Fri Mar 3 10:39:37 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Mar 3 16:52:13 2017 +0100
----------------------------------------------------------------------
.../nodes/dataset/DataSetWindowAggregate.scala | 96 ++++++---
.../table/runtime/aggregate/AggregateUtil.scala | 57 +++++-
...ionWindowAggregateCombineGroupFunction.scala | 168 ----------------
...aSetSessionWindowAggregatePreProcessor.scala | 197 +++++++++++++++++++
.../dataset/DataSetWindowAggregateITCase.scala | 21 +-
5 files changed, 331 insertions(+), 208 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/728c936d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
index 597be8c..fb5ff3b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
@@ -194,33 +194,32 @@ class DataSetWindowAggregate(
val groupingKeys = grouping.indices.toArray
val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
- // grouping window
- if (groupingKeys.length > 0) {
- // create mapFunction for initializing the aggregations
- val mapFunction = createDataSetWindowPrepareMapFunction(
- window,
- namedAggregates,
- grouping,
- inputType,
- isParserCaseSensitive)
-
- val mappedInput = inputDS.map(mapFunction).name(prepareOperatorName)
+ // create mapFunction for initializing the aggregations
+ val mapFunction = createDataSetWindowPrepareMapFunction(
+ window,
+ namedAggregates,
+ grouping,
+ inputType,
+ isParserCaseSensitive)
- val mapReturnType = mapFunction.asInstanceOf[ResultTypeQueryable[Row]].getProducedType
+ val mappedInput = inputDS.map(mapFunction).name(prepareOperatorName)
- // the position of the rowtime field in the intermediate result for map output
- val rowTimeFieldPos = mapReturnType.getArity - 1
+ val mapReturnType = mapFunction.asInstanceOf[ResultTypeQueryable[Row]].getProducedType
- // do incremental aggregation
- if (doAllSupportPartialMerge(
- namedAggregates.map(_.getKey),
- inputType,
- grouping.length)) {
+ // the position of the rowtime field in the intermediate result for map output
+ val rowTimeFieldPos = mapReturnType.getArity - 1
- // gets the window-start and window-end position in the intermediate result.
- val windowStartPos = rowTimeFieldPos
- val windowEndPos = windowStartPos + 1
+ // do incremental aggregation
+ if (doAllSupportPartialMerge(
+ namedAggregates.map(_.getKey),
+ inputType,
+ grouping.length)) {
+ // gets the window-start and window-end position in the intermediate result.
+ val windowStartPos = rowTimeFieldPos
+ val windowEndPos = windowStartPos + 1
+ // grouping window
+ if (groupingKeys.length > 0) {
// create groupCombineFunction for combine the aggregations
val combineGroupFunction = createDataSetWindowAggregationCombineFunction(
window,
@@ -248,9 +247,38 @@ class DataSetWindowAggregate(
.reduceGroup(groupReduceFunction)
.returns(rowTypeInfo)
.name(aggregateOperatorName)
+ } else {
+ // non-grouping window
+ val mapPartitionFunction = createDataSetWindowAggregationMapPartitionFunction(
+ window,
+ namedAggregates,
+ inputType,
+ grouping)
+
+ // create groupReduceFunction for calculating the aggregations
+ val groupReduceFunction = createDataSetWindowAggregationGroupReduceFunction(
+ window,
+ namedAggregates,
+ inputType,
+ rowRelDataType,
+ grouping,
+ namedProperties,
+ isInputCombined = true)
+
+ mappedInput.sortPartition(rowTimeFieldPos, Order.ASCENDING)
+ .mapPartition(mapPartitionFunction)
+ .sortPartition(windowStartPos, Order.ASCENDING).setParallelism(1)
+ .sortPartition(windowEndPos, Order.ASCENDING).setParallelism(1)
+ .reduceGroup(groupReduceFunction)
+ .returns(rowTypeInfo)
+ .name(aggregateOperatorName)
+ .asInstanceOf[DataSet[Row]]
}
- // do non-incremental aggregation
- else {
+ // do non-incremental aggregation
+ } else {
+ // grouping window
+ if (groupingKeys.length > 0) {
+
// create groupReduceFunction for calculating the aggregations
val groupReduceFunction = createDataSetWindowAggregationGroupReduceFunction(
window,
@@ -265,13 +293,23 @@ class DataSetWindowAggregate(
.reduceGroup(groupReduceFunction)
.returns(rowTypeInfo)
.name(aggregateOperatorName)
+ } else {
+ // non-grouping window
+ val groupReduceFunction = createDataSetWindowAggregationGroupReduceFunction(
+ window,
+ namedAggregates,
+ inputType,
+ rowRelDataType,
+ grouping,
+ namedProperties)
+
+ mappedInput.sortPartition(rowTimeFieldPos, Order.ASCENDING).setParallelism(1)
+ .reduceGroup(groupReduceFunction)
+ .returns(rowTypeInfo)
+ .name(aggregateOperatorName)
+ .asInstanceOf[DataSet[Row]]
}
}
- // non-grouping window
- else {
- throw new UnsupportedOperationException(
- "Session non-grouping windows on event-time are currently not supported.")
- }
}
private def prepareOperatorName: String = {
http://git-wip-us.apache.org/repos/asf/flink/blob/728c936d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 40468ad..d549c37 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -25,7 +25,7 @@ import org.apache.calcite.sql.{SqlAggFunction, SqlKind}
import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.calcite.sql.fun._
-import org.apache.flink.api.common.functions.{InvalidTypesException, MapFunction, RichGroupCombineFunction, RichGroupReduceFunction, AggregateFunction => ApiAggregateFunction}
+import org.apache.flink.api.common.functions.{GroupCombineFunction, InvalidTypesException, MapFunction, MapPartitionFunction, RichGroupReduceFunction, AggregateFunction => ApiAggregateFunction}
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeHint, TypeInformation}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.java.typeutils.RowTypeInfo
@@ -246,6 +246,57 @@ object AggregateUtil {
}
/**
+ * Create a [[org.apache.flink.api.common.functions.MapPartitionFunction]] that aggregation
+ * for aggregates.
+ * The function returns aggregate values of all aggregate function which are
+ * organized by the following format:
+ *
+ * {{{
+ * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5
+ * | | windowEnd(max(rowtime)
+ * | | |
+ * v v v
+ * +--------+--------+--------+--------+-----------+---------+
+ * | sum1 | count1 | sum2 | count2 |windowStart|windowEnd|
+ * +--------+--------+--------+--------+-----------+---------+
+ * ^ ^
+ * | |
+ * sum(y) aggOffsetInRow = 4 windowStart(min(rowtime))
+ *
+ * }}}
+ *
+ */
+ def createDataSetWindowAggregationMapPartitionFunction(
+ window: LogicalWindow,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType,
+ groupings: Array[Int]): MapPartitionFunction[Row, Row] = {
+
+ val aggregates = transformToAggregateFunctions(
+ namedAggregates.map(_.getKey),
+ inputType,
+ 0)._2
+
+ window match {
+ case EventTimeSessionGroupWindow(_, _, gap) =>
+ val combineReturnType: RowTypeInfo =
+ createDataSetAggregateBufferDataType(
+ groupings,
+ aggregates,
+ inputType,
+ Option(Array(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO)))
+
+ new DataSetSessionWindowAggregatePreProcessor(
+ aggregates,
+ groupings,
+ asLong(gap),
+ combineReturnType)
+ case _ =>
+ throw new UnsupportedOperationException(s"$window is currently not supported on batch")
+ }
+ }
+
+ /**
* Create a [[org.apache.flink.api.common.functions.GroupCombineFunction]] that pre-aggregation
* for aggregates.
* The function returns intermediate aggregate values of all aggregate function which are
@@ -268,7 +319,7 @@ object AggregateUtil {
namedAggregates: Seq[CalcitePair[AggregateCall, String]],
inputType: RelDataType,
groupings: Array[Int])
- : RichGroupCombineFunction[Row, Row] = {
+ : GroupCombineFunction[Row, Row] = {
val aggregates = transformToAggregateFunctions(
namedAggregates.map(_.getKey),
@@ -284,7 +335,7 @@ object AggregateUtil {
inputType,
Option(Array(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO)))
- new DataSetSessionWindowAggregateCombineGroupFunction(
+ new DataSetSessionWindowAggregatePreProcessor(
aggregates,
groupings,
asLong(gap),
http://git-wip-us.apache.org/repos/asf/flink/blob/728c936d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala
deleted file mode 100644
index 88cd19f..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.aggregate
-
-import java.lang.Iterable
-import java.util.{ArrayList => JArrayList}
-
-import org.apache.flink.api.common.functions.RichGroupCombineFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.types.Row
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
-import org.apache.flink.util.{Collector, Preconditions}
-
-/**
- * This wraps the aggregate logic inside of
- * [[org.apache.flink.api.java.operators.GroupCombineOperator]].
- *
- * @param aggregates The aggregate functions.
- * @param groupingKeys The indexes of the grouping fields.
- * @param gap Session time window gap.
- * @param intermediateRowType Intermediate row data type.
- */
-class DataSetSessionWindowAggregateCombineGroupFunction(
- aggregates: Array[AggregateFunction[_ <: Any]],
- groupingKeys: Array[Int],
- gap: Long,
- @transient intermediateRowType: TypeInformation[Row])
- extends RichGroupCombineFunction[Row, Row] with ResultTypeQueryable[Row] {
-
- private var aggregateBuffer: Row = _
- private val accumStartPos: Int = groupingKeys.length
- private val rowTimeFieldPos = accumStartPos + aggregates.length
-
- val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) {
- new JArrayList[Accumulator](2)
- }
-
- override def open(config: Configuration) {
- Preconditions.checkNotNull(aggregates)
- Preconditions.checkNotNull(groupingKeys)
- aggregateBuffer = new Row(rowTimeFieldPos + 2)
-
- // init lists with two empty accumulators
- for (i <- aggregates.indices) {
- val accumulator = aggregates(i).createAccumulator()
- accumulatorList(i).add(accumulator)
- accumulatorList(i).add(accumulator)
- }
- }
-
- /**
- * For sub-grouped intermediate aggregate Rows, divide window based on the rowtime
- * (current'rowtime - previous\u2019rowtime > gap), and then merge data (within a unified window)
- * into an aggregate buffer.
- *
- * @param records Sub-grouped intermediate aggregate Rows.
- * @return Combined intermediate aggregate Row.
- *
- */
- override def combine(records: Iterable[Row], out: Collector[Row]): Unit = {
-
- var windowStart: java.lang.Long = null
- var windowEnd: java.lang.Long = null
- var currentRowTime: java.lang.Long = null
-
- // reset first accumulator in merge list
- for (i <- aggregates.indices) {
- val accumulator = aggregates(i).createAccumulator()
- accumulatorList(i).set(0, accumulator)
- }
-
- val iterator = records.iterator()
-
- while (iterator.hasNext) {
- val record = iterator.next()
- currentRowTime = record.getField(rowTimeFieldPos).asInstanceOf[Long]
- // initial traversal or opening a new window
- if (windowEnd == null || (windowEnd != null && (currentRowTime > windowEnd))) {
-
- // calculate the current window and open a new window.
- if (windowEnd != null) {
- // emit the current window's merged data
- doCollect(out, accumulatorList, windowStart, windowEnd)
-
- // reset first value of accumulator list
- for (i <- aggregates.indices) {
- val accumulator = aggregates(i).createAccumulator()
- accumulatorList(i).set(0, accumulator)
- }
- } else {
- // set group keys to aggregateBuffer.
- for (i <- groupingKeys.indices) {
- aggregateBuffer.setField(i, record.getField(i))
- }
- }
-
- windowStart = record.getField(rowTimeFieldPos).asInstanceOf[Long]
- }
-
- for (i <- aggregates.indices) {
- // insert received accumulator into acc list
- val newAcc = record.getField(accumStartPos + i).asInstanceOf[Accumulator]
- accumulatorList(i).set(1, newAcc)
- // merge acc list
- val retAcc = aggregates(i).merge(accumulatorList(i))
- // insert result into acc list
- accumulatorList(i).set(0, retAcc)
- }
-
- // the current rowtime is the last rowtime of the next calculation.
- windowEnd = currentRowTime + gap
- }
- // emit the merged data of the current window.
- doCollect(out, accumulatorList, windowStart, windowEnd)
- }
-
- /**
- * Emit the merged data of the current window.
- *
- * @param out the collection of the aggregate results
- * @param accumulatorList an array (indexed by aggregate index) of the accumulator lists for
- * each aggregate
- * @param windowStart the window's start attribute value is the min (rowtime)
- * of all rows in the window.
- * @param windowEnd the window's end property value is max (rowtime) + gap
- * for all rows in the window.
- */
- def doCollect(
- out: Collector[Row],
- accumulatorList: Array[JArrayList[Accumulator]],
- windowStart: Long,
- windowEnd: Long): Unit = {
-
- // merge the accumulators into one accumulator
- for (i <- aggregates.indices) {
- aggregateBuffer.setField(accumStartPos + i, accumulatorList(i).get(0))
- }
-
- // intermediate Row WindowStartPos is rowtime pos.
- aggregateBuffer.setField(rowTimeFieldPos, windowStart)
-
- // intermediate Row WindowEndPos is rowtime pos + 1.
- aggregateBuffer.setField(rowTimeFieldPos + 1, windowEnd)
-
- out.collect(aggregateBuffer)
- }
-
- override def getProducedType: TypeInformation[Row] = {
- intermediateRowType
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/728c936d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
new file mode 100644
index 0000000..a299c40
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+import java.util.{ArrayList => JArrayList}
+
+import org.apache.flink.api.common.functions.{AbstractRichFunction, GroupCombineFunction, MapPartitionFunction, RichGroupCombineFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+ * This wraps the aggregate logic inside of
+ * [[org.apache.flink.api.java.operators.GroupCombineOperator]].
+ *
+ * @param aggregates The aggregate functions.
+ * @param groupingKeys The indexes of the grouping fields.
+ * @param gap Session time window gap.
+ * @param intermediateRowType Intermediate row data type.
+ */
+class DataSetSessionWindowAggregatePreProcessor(
+ aggregates: Array[AggregateFunction[_ <: Any]],
+ groupingKeys: Array[Int],
+ gap: Long,
+ @transient intermediateRowType: TypeInformation[Row])
+ extends AbstractRichFunction
+ with MapPartitionFunction[Row,Row]
+ with GroupCombineFunction[Row,Row]
+ with ResultTypeQueryable[Row] {
+
+ private var aggregateBuffer: Row = _
+ private val accumStartPos: Int = groupingKeys.length
+ private val rowTimeFieldPos = accumStartPos + aggregates.length
+
+ val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) {
+ new JArrayList[Accumulator](2)
+ }
+
+ override def open(config: Configuration) {
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(groupingKeys)
+ aggregateBuffer = new Row(rowTimeFieldPos + 2)
+
+ // init lists with two empty accumulators
+ for (i <- aggregates.indices) {
+ val accumulator = aggregates(i).createAccumulator()
+ accumulatorList(i).add(accumulator)
+ accumulatorList(i).add(accumulator)
+ }
+ }
+
+ /**
+ * For sub-grouped intermediate aggregate Rows, divide window based on the rowtime
+ * (current'rowtime - previous\u2019rowtime > gap), and then merge data (within a unified window)
+ * into an aggregate buffer.
+ *
+ * @param records Sub-grouped intermediate aggregate Rows.
+ * @return Combined intermediate aggregate Row.
+ *
+ */
+ override def combine(records: Iterable[Row], out: Collector[Row]): Unit = {
+ preProcessing(records, out)
+ }
+
+ /**
+ * Divide window based on the rowtime
+ * (current'rowtime - previous\u2019rowtime > gap), and then merge data (within a unified window)
+ * into an aggregate buffer.
+ *
+ * @param records Intermediate aggregate Rows.
+ * @return Pre partition intermediate aggregate Row.
+ *
+ */
+ override def mapPartition(records: Iterable[Row], out: Collector[Row]): Unit = {
+ preProcessing(records, out)
+ }
+
+ /**
+ * Intermediate aggregate Rows, divide window based on the rowtime
+ * (current'rowtime - previous\u2019rowtime > gap), and then merge data (within a unified window)
+ * into an aggregate buffer.
+ *
+ * @param records Intermediate aggregate Rows.
+ * @return PreProcessing intermediate aggregate Row.
+ *
+ */
+ private def preProcessing(records: Iterable[Row], out: Collector[Row]): Unit = {
+
+ var windowStart: java.lang.Long = null
+ var windowEnd: java.lang.Long = null
+ var currentRowTime: java.lang.Long = null
+
+ // reset first accumulator in merge list
+ for (i <- aggregates.indices) {
+ val accumulator = aggregates(i).createAccumulator()
+ accumulatorList(i).set(0, accumulator)
+ }
+
+ val iterator = records.iterator()
+
+ while (iterator.hasNext) {
+ val record = iterator.next()
+ currentRowTime = record.getField(rowTimeFieldPos).asInstanceOf[Long]
+ // initial traversal or opening a new window
+ if (windowEnd == null || (windowEnd != null && (currentRowTime > windowEnd))) {
+
+ // calculate the current window and open a new window.
+ if (windowEnd != null) {
+ // emit the current window's merged data
+ doCollect(out, accumulatorList, windowStart, windowEnd)
+
+ // reset first value of accumulator list
+ for (i <- aggregates.indices) {
+ val accumulator = aggregates(i).createAccumulator()
+ accumulatorList(i).set(0, accumulator)
+ }
+ } else {
+ // set group keys to aggregateBuffer.
+ for (i <- groupingKeys.indices) {
+ aggregateBuffer.setField(i, record.getField(i))
+ }
+ }
+
+ windowStart = record.getField(rowTimeFieldPos).asInstanceOf[Long]
+ }
+
+ for (i <- aggregates.indices) {
+ // insert received accumulator into acc list
+ val newAcc = record.getField(accumStartPos + i).asInstanceOf[Accumulator]
+ accumulatorList(i).set(1, newAcc)
+ // merge acc list
+ val retAcc = aggregates(i).merge(accumulatorList(i))
+ // insert result into acc list
+ accumulatorList(i).set(0, retAcc)
+ }
+
+ // the current rowtime is the last rowtime of the next calculation.
+ windowEnd = currentRowTime + gap
+ }
+ // emit the merged data of the current window.
+ doCollect(out, accumulatorList, windowStart, windowEnd)
+ }
+
+ /**
+ * Emit the merged data of the current window.
+ *
+ * @param out the collection of the aggregate results
+ * @param accumulatorList an array (indexed by aggregate index) of the accumulator lists for
+ * each aggregate
+ * @param windowStart the window's start attribute value is the min (rowtime)
+ * of all rows in the window.
+ * @param windowEnd the window's end property value is max (rowtime) + gap
+ * for all rows in the window.
+ */
+ def doCollect(
+ out: Collector[Row],
+ accumulatorList: Array[JArrayList[Accumulator]],
+ windowStart: Long,
+ windowEnd: Long): Unit = {
+
+ // merge the accumulators into one accumulator
+ for (i <- aggregates.indices) {
+ aggregateBuffer.setField(accumStartPos + i, accumulatorList(i).get(0))
+ }
+
+ // intermediate Row WindowStartPos is rowtime pos.
+ aggregateBuffer.setField(rowTimeFieldPos, windowStart)
+
+ // intermediate Row WindowEndPos is rowtime pos + 1.
+ aggregateBuffer.setField(rowTimeFieldPos + 1, windowEnd)
+
+ out.collect(aggregateBuffer)
+ }
+
+ override def getProducedType: TypeInformation[Row] = {
+ intermediateRowType
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/728c936d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
index 071f0ee..882f4b6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
@@ -42,7 +42,7 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode)
(1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"),
(2L, 2, 2d, 2f, new BigDecimal("2"), "Hallo"),
(3L, 2, 2d, 2f, new BigDecimal("2"), "Hello"),
- (6L, 3, 3d, 3f, new BigDecimal("3"), "Hello"),
+ (7L, 3, 3d, 3f, new BigDecimal("3"), "Hello"),
(4L, 5, 5d, 5f, new BigDecimal("5"), "Hello"),
(16L, 4, 4d, 4f, new BigDecimal("4"), "Hello world"),
(8L, 3, 3d, 3f, new BigDecimal("3"), "Hello world"))
@@ -152,23 +152,28 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode)
val expected = "Hallo,1,1970-01-01 00:00:00.002,1970-01-01 00:00:00.009\n" +
"Hello world,1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.015\n" +
"Hello world,1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.023\n" +
- "Hello,3,1970-01-01 00:00:00.003,1970-01-01 00:00:00.013\n" +
+ "Hello,3,1970-01-01 00:00:00.003,1970-01-01 00:00:00.014\n" +
"Hi,1,1970-01-01 00:00:00.001,1970-01-01 00:00:00.008"
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
- @Test(expected = classOf[UnsupportedOperationException])
- def testAlldEventTimeSessionGroupWindow(): Unit = {
- // Non-grouping Session window on event-time are currently not supported
+ @Test
+ def testAllEventTimeSessionGroupWindow(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
val table = env
.fromCollection(data)
.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
- val windowedTable =table
- .window(Session withGap 7.milli on 'long as 'w)
+
+ val results =table
+ .window(Session withGap 2.milli on 'long as 'w)
.groupBy('w)
- .select('string.count).toDataSet[Row].collect()
+ .select('string.count, 'w.start, 'w.end).toDataSet[Row].collect()
+
+ val expected = "4,1970-01-01 00:00:00.001,1970-01-01 00:00:00.006\n" +
+ "2,1970-01-01 00:00:00.007,1970-01-01 00:00:00.01\n" +
+ "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.018"
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test(expected = classOf[ValidationException])