You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/03/08 17:07:11 UTC
flink git commit: [FLINK-5983] [table] Convert FOR into WHILE loops
for aggregation functions.
Repository: flink
Updated Branches:
refs/heads/master 31a57c5a8 -> adbf846f2
[FLINK-5983] [table] Convert FOR into WHILE loops for aggregation functions.
This closes #3489.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/adbf846f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/adbf846f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/adbf846f
Branch: refs/heads/master
Commit: adbf846f23881b98ab9dc5886a0b066b8aa1ded6
Parents: 31a57c5
Author: Fabian Hueske <fh...@apache.org>
Authored: Tue Mar 7 22:17:54 2017 +0100
Committer: twalthr <tw...@apache.org>
Committed: Wed Mar 8 17:59:31 2017 +0100
----------------------------------------------------------------------
.../aggregate/AggregateAggFunction.scala | 29 ++++++-----
.../aggregate/AggregateMapFunction.scala | 17 ++++---
.../AggregateReduceCombineFunction.scala | 16 ++++--
.../AggregateReduceGroupFunction.scala | 52 ++++++++++++--------
...SetSessionWindowAggReduceGroupFunction.scala | 36 +++++++++-----
...aSetSessionWindowAggregatePreProcessor.scala | 27 +++++++---
...umbleCountWindowAggReduceGroupFunction.scala | 32 +++++++-----
...mbleTimeWindowAggReduceCombineFunction.scala | 16 ++++--
...TumbleTimeWindowAggReduceGroupFunction.scala | 33 ++++++++-----
.../aggregate/DataSetWindowAggMapFunction.scala | 23 +++++----
.../IncrementalAggregateAllWindowFunction.scala | 2 +-
.../IncrementalAggregateWindowFunction.scala | 8 ++-
...UnboundedProcessingOverProcessFunction.scala | 6 ++-
13 files changed, 193 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/adbf846f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
index 4d1579b..11d55e5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
@@ -36,43 +36,50 @@ class AggregateAggFunction(
private val aggFields: Array[Int])
extends DataStreamAggFunc[Row, Row, Row] {
- val aggsWithIdx: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex
-
override def createAccumulator(): Row = {
val accumulatorRow: Row = new Row(aggregates.length)
- aggsWithIdx.foreach { case (agg, i) =>
- accumulatorRow.setField(i, agg.createAccumulator())
+ var i = 0
+ while (i < aggregates.length) {
+ accumulatorRow.setField(i, aggregates(i).createAccumulator())
+ i += 1
}
accumulatorRow
}
- override def add(value: Row, accumulatorRow: Row) = {
+ override def add(value: Row, accumulatorRow: Row): Unit = {
- aggsWithIdx.foreach { case (agg, i) =>
+ var i = 0
+ while (i < aggregates.length) {
val acc = accumulatorRow.getField(i).asInstanceOf[Accumulator]
val v = value.getField(aggFields(i))
- agg.accumulate(acc, v)
+ aggregates(i).accumulate(acc, v)
+ i += 1
}
}
override def getResult(accumulatorRow: Row): Row = {
val output = new Row(aggFields.length)
- aggsWithIdx.foreach { case (agg, i) =>
- output.setField(i, agg.getValue(accumulatorRow.getField(i).asInstanceOf[Accumulator]))
+ var i = 0
+ while (i < aggregates.length) {
+ val acc = accumulatorRow.getField(i).asInstanceOf[Accumulator]
+ output.setField(i, aggregates(i).getValue(acc))
+ i += 1
}
output
}
override def merge(aAccumulatorRow: Row, bAccumulatorRow: Row): Row = {
- aggsWithIdx.foreach { case (agg, i) =>
+ var i = 0
+ while (i < aggregates.length) {
val aAcc = aAccumulatorRow.getField(i).asInstanceOf[Accumulator]
val bAcc = bAccumulatorRow.getField(i).asInstanceOf[Accumulator]
val accumulators: JList[Accumulator] = new JArrayList[Accumulator]()
accumulators.add(aAcc)
accumulators.add(bAcc)
- aAccumulatorRow.setField(i, agg.merge(accumulators))
+ aAccumulatorRow.setField(i, aggregates(i).merge(accumulators))
+ i += 1
}
aAccumulatorRow
}
http://git-wip-us.apache.org/repos/asf/flink/blob/adbf846f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateMapFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateMapFunction.scala
index d936fbb..ba1bb1d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateMapFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateMapFunction.scala
@@ -32,28 +32,33 @@ class AggregateMapFunction[IN, OUT](
@transient private val returnType: TypeInformation[OUT])
extends RichMapFunction[IN, OUT] with ResultTypeQueryable[OUT] {
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+ private val partialRowLength = groupingKeys.length + aggregates.length
private var output: Row = _
override def open(config: Configuration) {
- Preconditions.checkNotNull(aggregates)
- Preconditions.checkNotNull(aggFields)
- Preconditions.checkArgument(aggregates.length == aggFields.length)
- val partialRowLength = groupingKeys.length + aggregates.length
output = new Row(partialRowLength)
}
override def map(value: IN): OUT = {
val input = value.asInstanceOf[Row]
- for (i <- aggregates.indices) {
+ var i = 0
+ while (i < aggregates.length) {
val agg = aggregates(i)
val accumulator = agg.createAccumulator()
agg.accumulate(accumulator, input.getField(aggFields(i)))
output.setField(groupingKeys.length + i, accumulator)
+ i += 1
}
- for (i <- groupingKeys.indices) {
+ i = 0
+ while (i < groupingKeys.length) {
output.setField(i, input.getField(groupingKeys(i)))
+ i += 1
}
output.asInstanceOf[OUT]
}
http://git-wip-us.apache.org/repos/asf/flink/blob/adbf846f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala
index 6b95cb8..33ededa 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala
@@ -73,15 +73,18 @@ class AggregateReduceCombineFunction(
val iterator = records.iterator()
// reset first accumulator in merge list
- for (i <- aggregates.indices) {
+ var i = 0
+ while (i < aggregates.length) {
val accumulator = aggregates(i).createAccumulator()
accumulatorList(i).set(0, accumulator)
+ i += 1
}
while (iterator.hasNext) {
val record = iterator.next()
- for (i <- aggregates.indices) {
+ i = 0
+ while (i < aggregates.length) {
// insert received accumulator into acc list
val newAcc = record.getField(groupKeysMapping.length + i).asInstanceOf[Accumulator]
accumulatorList(i).set(1, newAcc)
@@ -89,19 +92,24 @@ class AggregateReduceCombineFunction(
val retAcc = aggregates(i).merge(accumulatorList(i))
// insert result into acc list
accumulatorList(i).set(0, retAcc)
+ i += 1
}
last = record
}
// set the partial merged result to the aggregateBuffer
- for (i <- aggregates.indices) {
+ i = 0
+ while (i < aggregates.length) {
preAggOutput.setField(groupKeysMapping.length + i, accumulatorList(i).get(0))
+ i += 1
}
// set group keys to aggregateBuffer.
- for (i <- groupKeysMapping.indices) {
+ i = 0
+ while (i < groupKeysMapping.length) {
preAggOutput.setField(i, last.getField(i))
+ i += 1
}
preAggOutput
http://git-wip-us.apache.org/repos/asf/flink/blob/adbf846f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala
index 2f75cd7..337f1dd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala
@@ -48,26 +48,27 @@ class AggregateReduceGroupFunction(
private val finalRowArity: Int)
extends RichGroupReduceFunction[Row, Row] {
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(groupKeysMapping)
+
private var output: Row = _
- private var intermediateGroupKeys: Option[Array[Int]] = None
+ private val intermediateGroupKeys: Option[Array[Int]] =
+ if (groupingSetsMapping.nonEmpty) { Some(groupKeysMapping.map(_._1)) } else { None }
val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) {
new JArrayList[Accumulator](2)
}
override def open(config: Configuration) {
- Preconditions.checkNotNull(aggregates)
- Preconditions.checkNotNull(groupKeysMapping)
output = new Row(finalRowArity)
- if (!groupingSetsMapping.isEmpty) {
- intermediateGroupKeys = Some(groupKeysMapping.map(_._1))
- }
// init lists with two empty accumulators
- for (i <- aggregates.indices) {
+ var i = 0
+ while (i < aggregates.length) {
val accumulator = aggregates(i).createAccumulator()
accumulatorList(i).add(accumulator)
accumulatorList(i).add(accumulator)
+ i += 1
}
}
@@ -86,15 +87,18 @@ class AggregateReduceGroupFunction(
val iterator = records.iterator()
// reset first accumulator in merge list
- for (i <- aggregates.indices) {
+ var i = 0
+ while (i < aggregates.length) {
val accumulator = aggregates(i).createAccumulator()
accumulatorList(i).set(0, accumulator)
+ i += 1
}
while (iterator.hasNext) {
val record = iterator.next()
- for (i <- aggregates.indices) {
+ i = 0
+ while (i < aggregates.length) {
// insert received accumulator into acc list
val newAcc = record.getField(groupKeysMapping.length + i).asInstanceOf[Accumulator]
accumulatorList(i).set(1, newAcc)
@@ -102,31 +106,37 @@ class AggregateReduceGroupFunction(
val retAcc = aggregates(i).merge(accumulatorList(i))
// insert result into acc list
accumulatorList(i).set(0, retAcc)
+ i += 1
}
last = record
}
// Set group keys value to final output.
- groupKeysMapping.foreach {
- case (after, previous) =>
- output.setField(after, last.getField(previous))
+ i = 0
+ while (i < groupKeysMapping.length) {
+ val (after, previous) = groupKeysMapping(i)
+ output.setField(after, last.getField(previous))
+ i += 1
}
// get final aggregate value and set to output.
- aggregateMapping.foreach {
- case (after, previous) => {
- val agg = aggregates(previous)
- val result = agg.getValue(accumulatorList(previous).get(0))
- output.setField(after, result)
- }
+ i = 0
+ while (i < aggregateMapping.length) {
+ val (after, previous) = aggregateMapping(i)
+ val agg = aggregates(previous)
+ val result = agg.getValue(accumulatorList(previous).get(0))
+ output.setField(after, result)
+ i += 1
}
// Evaluate additional values of grouping sets
if (intermediateGroupKeys.isDefined) {
- groupingSetsMapping.foreach {
- case (inputIndex, outputIndex) =>
- output.setField(outputIndex, !intermediateGroupKeys.get.contains(inputIndex))
+ i = 0
+ while (i < groupingSetsMapping.length) {
+ val (inputIndex, outputIndex) = groupingSetsMapping(i)
+ output.setField(outputIndex, !intermediateGroupKeys.get.contains(inputIndex))
+ i += 1
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/adbf846f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
index 1f19687..9c962ea 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
@@ -61,6 +61,9 @@ class DataSetSessionWindowAggReduceGroupFunction(
isInputCombined: Boolean)
extends RichGroupReduceFunction[Row, Row] {
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(groupKeysMapping)
+
private var aggregateBuffer: Row = _
private var output: Row = _
private var collector: TimeWindowPropertyCollector = _
@@ -74,8 +77,6 @@ class DataSetSessionWindowAggReduceGroupFunction(
}
override def open(config: Configuration) {
- Preconditions.checkNotNull(aggregates)
- Preconditions.checkNotNull(groupKeysMapping)
aggregateBuffer = new Row(intermediateRowArity)
output = new Row(finalRowArity)
collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos)
@@ -104,10 +105,13 @@ class DataSetSessionWindowAggReduceGroupFunction(
var windowEnd: java.lang.Long = null
var currentRowTime: java.lang.Long = null
+
// reset first accumulator in merge list
- for (i <- aggregates.indices) {
+ var i = 0
+ while (i < aggregates.length) {
val accumulator = aggregates(i).createAccumulator()
accumulatorList(i).set(0, accumulator)
+ i += 1
}
val iterator = records.iterator()
@@ -125,22 +129,27 @@ class DataSetSessionWindowAggReduceGroupFunction(
doEvaluateAndCollect(out, accumulatorList, windowStart, windowEnd)
// reset first accumulator in list
- for (i <- aggregates.indices) {
+ i = 0
+ while (i < aggregates.length) {
val accumulator = aggregates(i).createAccumulator()
accumulatorList(i).set(0, accumulator)
+ i += 1
}
} else {
// set group keys value to final output.
- groupKeysMapping.foreach {
- case (after, previous) =>
- output.setField(after, record.getField(previous))
+ i = 0
+ while (i < groupKeysMapping.length) {
+ val (after, previous) = groupKeysMapping(i)
+ output.setField(after, record.getField(previous))
+ i += 1
}
}
windowStart = record.getField(intermediateRowWindowStartPos).asInstanceOf[Long]
}
- for (i <- aggregates.indices) {
+ i = 0
+ while (i < aggregates.length) {
// insert received accumulator into acc list
val newAcc = record.getField(accumStartPos + i).asInstanceOf[Accumulator]
accumulatorList(i).set(1, newAcc)
@@ -148,6 +157,7 @@ class DataSetSessionWindowAggReduceGroupFunction(
val retAcc = aggregates(i).merge(accumulatorList(i))
// insert result into acc list
accumulatorList(i).set(0, retAcc)
+ i += 1
}
windowEnd = if (isInputCombined) {
@@ -180,10 +190,12 @@ class DataSetSessionWindowAggReduceGroupFunction(
windowEnd: Long): Unit = {
// merge the accumulators and then get value for the final output
- aggregateMapping.foreach {
- case (after, previous) =>
- val agg = aggregates(previous)
- output.setField(after, agg.getValue(accumulatorList(previous).get(0)))
+ var i = 0
+ while (i < aggregateMapping.length) {
+ val (after, previous) = aggregateMapping(i)
+ val agg = aggregates(previous)
+ output.setField(after, agg.getValue(accumulatorList(previous).get(0)))
+ i += 1
}
// adds TimeWindow properties to output then emit output
http://git-wip-us.apache.org/repos/asf/flink/blob/adbf846f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
index a299c40..b99c83e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.runtime.aggregate
import java.lang.Iterable
import java.util.{ArrayList => JArrayList}
-import org.apache.flink.api.common.functions.{AbstractRichFunction, GroupCombineFunction, MapPartitionFunction, RichGroupCombineFunction}
+import org.apache.flink.api.common.functions.{AbstractRichFunction, GroupCombineFunction, MapPartitionFunction}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.types.Row
@@ -47,6 +47,9 @@ class DataSetSessionWindowAggregatePreProcessor(
with GroupCombineFunction[Row,Row]
with ResultTypeQueryable[Row] {
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(groupingKeys)
+
private var aggregateBuffer: Row = _
private val accumStartPos: Int = groupingKeys.length
private val rowTimeFieldPos = accumStartPos + aggregates.length
@@ -56,8 +59,6 @@ class DataSetSessionWindowAggregatePreProcessor(
}
override def open(config: Configuration) {
- Preconditions.checkNotNull(aggregates)
- Preconditions.checkNotNull(groupingKeys)
aggregateBuffer = new Row(rowTimeFieldPos + 2)
// init lists with two empty accumulators
@@ -110,9 +111,11 @@ class DataSetSessionWindowAggregatePreProcessor(
var currentRowTime: java.lang.Long = null
// reset first accumulator in merge list
- for (i <- aggregates.indices) {
+ var i = 0
+ while (i < aggregates.length) {
val accumulator = aggregates(i).createAccumulator()
accumulatorList(i).set(0, accumulator)
+ i += 1
}
val iterator = records.iterator()
@@ -129,21 +132,26 @@ class DataSetSessionWindowAggregatePreProcessor(
doCollect(out, accumulatorList, windowStart, windowEnd)
// reset first value of accumulator list
- for (i <- aggregates.indices) {
+ i = 0
+ while (i < aggregates.length) {
val accumulator = aggregates(i).createAccumulator()
accumulatorList(i).set(0, accumulator)
+ i += 1
}
} else {
// set group keys to aggregateBuffer.
- for (i <- groupingKeys.indices) {
+ i = 0
+ while (i < groupingKeys.length) {
aggregateBuffer.setField(i, record.getField(i))
+ i += 1
}
}
windowStart = record.getField(rowTimeFieldPos).asInstanceOf[Long]
}
- for (i <- aggregates.indices) {
+ i = 0
+ while (i < aggregates.length) {
// insert received accumulator into acc list
val newAcc = record.getField(accumStartPos + i).asInstanceOf[Accumulator]
accumulatorList(i).set(1, newAcc)
@@ -151,6 +159,7 @@ class DataSetSessionWindowAggregatePreProcessor(
val retAcc = aggregates(i).merge(accumulatorList(i))
// insert result into acc list
accumulatorList(i).set(0, retAcc)
+ i += 1
}
// the current rowtime is the last rowtime of the next calculation.
@@ -178,8 +187,10 @@ class DataSetSessionWindowAggregatePreProcessor(
windowEnd: Long): Unit = {
// merge the accumulators into one accumulator
- for (i <- aggregates.indices) {
+ var i = 0
+ while (i < aggregates.length) {
aggregateBuffer.setField(accumStartPos + i, accumulatorList(i).get(0))
+ i += 1
}
// intermediate Row WindowStartPos is rowtime pos.
http://git-wip-us.apache.org/repos/asf/flink/blob/adbf846f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
index ecc945c..7022c65 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
@@ -47,6 +47,9 @@ class DataSetTumbleCountWindowAggReduceGroupFunction(
private val finalRowArity: Int)
extends RichGroupReduceFunction[Row, Row] {
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(groupKeysMapping)
+
private var output: Row = _
private val accumStartPos: Int = groupKeysMapping.length
@@ -55,8 +58,6 @@ class DataSetTumbleCountWindowAggReduceGroupFunction(
}
override def open(config: Configuration) {
- Preconditions.checkNotNull(aggregates)
- Preconditions.checkNotNull(groupKeysMapping)
output = new Row(finalRowArity)
// init lists with two empty accumulators
@@ -71,21 +72,25 @@ class DataSetTumbleCountWindowAggReduceGroupFunction(
var count: Long = 0
val iterator = records.iterator()
+ var i = 0
while (iterator.hasNext) {
if (count == 0) {
// reset first accumulator
- for (i <- aggregates.indices) {
+ i = 0
+ while (i < aggregates.length) {
val accumulator = aggregates(i).createAccumulator()
accumulatorList(i).set(0, accumulator)
+ i += 1
}
}
val record = iterator.next()
count += 1
- for (i <- aggregates.indices) {
+ i = 0
+ while (i < aggregates.length) {
// insert received accumulator into acc list
val newAcc = record.getField(accumStartPos + i).asInstanceOf[Accumulator]
accumulatorList(i).set(1, newAcc)
@@ -93,20 +98,25 @@ class DataSetTumbleCountWindowAggReduceGroupFunction(
val retAcc = aggregates(i).merge(accumulatorList(i))
// insert result into acc list
accumulatorList(i).set(0, retAcc)
+ i += 1
}
if (windowSize == count) {
// set group keys value to final output.
- groupKeysMapping.foreach {
- case (after, previous) =>
- output.setField(after, record.getField(previous))
+ i = 0
+ while (i < groupKeysMapping.length) {
+ val (after, previous) = groupKeysMapping(i)
+ output.setField(after, record.getField(previous))
+ i += 1
}
// merge the accumulators and then get value for the final output
- aggregateMapping.foreach {
- case (after, previous) =>
- val agg = aggregates(previous)
- output.setField(after, agg.getValue(accumulatorList(previous).get(0)))
+ i = 0
+ while (i < aggregateMapping.length) {
+ val (after, previous) = aggregateMapping(i)
+ val agg = aggregates(previous)
+ output.setField(after, agg.getValue(accumulatorList(previous).get(0)))
+ i += 1
}
// emit the output
http://git-wip-us.apache.org/repos/asf/flink/blob/adbf846f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala
index df8bed9..c618325 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala
@@ -70,15 +70,18 @@ class DataSetTumbleTimeWindowAggReduceCombineFunction(
val iterator = records.iterator()
// reset first accumulator in merge list
- for (i <- aggregates.indices) {
+ var i = 0
+ while (i < aggregates.length) {
val accumulator = aggregates(i).createAccumulator()
accumulatorList(i).set(0, accumulator)
+ i += 1
}
while (iterator.hasNext) {
val record = iterator.next()
- for (i <- aggregates.indices) {
+ i = 0
+ while (i < aggregates.length) {
// insert received accumulator into acc list
val newAcc = record.getField(groupKeysMapping.length + i).asInstanceOf[Accumulator]
accumulatorList(i).set(1, newAcc)
@@ -86,19 +89,24 @@ class DataSetTumbleTimeWindowAggReduceCombineFunction(
val retAcc = aggregates(i).merge(accumulatorList(i))
// insert result into acc list
accumulatorList(i).set(0, retAcc)
+ i += 1
}
last = record
}
// set the partial merged result to the aggregateBuffer
- for (i <- aggregates.indices) {
+ i = 0
+ while (i < aggregates.length) {
aggregateBuffer.setField(groupKeysMapping.length + i, accumulatorList(i).get(0))
+ i += 1
}
// set group keys to aggregateBuffer.
- for (i <- groupKeysMapping.indices) {
+ i = 0
+ while (i < groupKeysMapping.length) {
aggregateBuffer.setField(i, last.getField(i))
+ i += 1
}
// set the rowtime attribute
http://git-wip-us.apache.org/repos/asf/flink/blob/adbf846f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
index 674c078..f85f2cd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
@@ -51,6 +51,9 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
finalRowArity: Int)
extends RichGroupReduceFunction[Row, Row] {
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(groupKeysMapping)
+
private var collector: TimeWindowPropertyCollector = _
protected var aggregateBuffer: Row = _
private var output: Row = _
@@ -64,8 +67,6 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
}
override def open(config: Configuration) {
- Preconditions.checkNotNull(aggregates)
- Preconditions.checkNotNull(groupKeysMapping)
aggregateBuffer = new Row(intermediateRowArity)
output = new Row(finalRowArity)
collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos)
@@ -84,15 +85,18 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
val iterator = records.iterator()
// reset first accumulator in merge list
- for (i <- aggregates.indices) {
+ var i = 0
+ while (i < aggregates.length) {
val accumulator = aggregates(i).createAccumulator()
accumulatorList(i).set(0, accumulator)
+ i += 1
}
while (iterator.hasNext) {
val record = iterator.next()
- for (i <- aggregates.indices) {
+ i = 0
+ while (i < aggregates.length) {
// insert received accumulator into acc list
val newAcc = record.getField(groupKeysMapping.length + i).asInstanceOf[Accumulator]
accumulatorList(i).set(1, newAcc)
@@ -100,23 +104,28 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
val retAcc = aggregates(i).merge(accumulatorList(i))
// insert result into acc list
accumulatorList(i).set(0, retAcc)
+ i += 1
}
last = record
}
// set group keys value to final output.
- groupKeysMapping.foreach {
- case (after, previous) =>
- output.setField(after, last.getField(previous))
+ i = 0
+ while (i < groupKeysMapping.length) {
+ val (after, previous) = groupKeysMapping(i)
+ output.setField(after, last.getField(previous))
+ i += 1
}
// get final aggregate value and set to output.
- aggregateMapping.foreach {
- case (after, previous) =>
- val agg = aggregates(previous)
- val result = agg.getValue(accumulatorList(previous).get(0))
- output.setField(after, result)
+ i = 0
+ while (i < aggregateMapping.length) {
+ val (after, previous) = aggregateMapping(i)
+ val agg = aggregates(previous)
+ val result = agg.getValue(accumulatorList(previous).get(0))
+ output.setField(after, result)
+ i += 1
}
// get window start timestamp
http://git-wip-us.apache.org/repos/asf/flink/blob/adbf846f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala
index 4a64c47..4707a4a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala
@@ -44,33 +44,36 @@ class DataSetWindowAggMapFunction(
@transient private val returnType: TypeInformation[Row])
extends RichMapFunction[Row, Row] with ResultTypeQueryable[Row] {
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+
private var output: Row = _
+ // add one more arity to store rowtime
+ private val partialRowLength = groupingKeys.length + aggregates.length + 1
// rowtime index in the buffer output row
- private var rowtimeIndex: Int = _
+ private val rowtimeIndex: Int = partialRowLength - 1
override def open(config: Configuration) {
- Preconditions.checkNotNull(aggregates)
- Preconditions.checkNotNull(aggFields)
- Preconditions.checkArgument(aggregates.length == aggFields.length)
- // add one more arity to store rowtime
- val partialRowLength = groupingKeys.length + aggregates.length + 1
- // set rowtime to the last field of the output row
- rowtimeIndex = partialRowLength - 1
output = new Row(partialRowLength)
}
override def map(input: Row): Row = {
- for (i <- aggregates.indices) {
+ var i = 0
+ while (i < aggregates.length) {
val agg = aggregates(i)
val fieldValue = input.getField(aggFields(i))
val accumulator = agg.createAccumulator()
agg.accumulate(accumulator, fieldValue)
output.setField(groupingKeys.length + i, accumulator)
+ i += 1
}
- for (i <- groupingKeys.indices) {
+ i = 0
+ while (i < groupingKeys.length) {
output.setField(i, input.getField(groupingKeys(i)))
+ i += 1
}
val timeField = input.getField(timeFieldPos)
http://git-wip-us.apache.org/repos/asf/flink/blob/adbf846f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
index 13ac6a9..bfab00b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
@@ -23,7 +23,7 @@ import org.apache.flink.types.Row
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
import org.apache.flink.streaming.api.windowing.windows.Window
-import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.util.Collector
/**
* Computes the final aggregate value from incrementally computed aggreagtes.
http://git-wip-us.apache.org/repos/asf/flink/blob/adbf846f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
index a4d4837..983efb3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
@@ -60,11 +60,15 @@ class IncrementalAggregateWindowFunction[W <: Window](
if (iterator.hasNext) {
val record = iterator.next()
- for (i <- 0 until numGroupingKey) {
+ var i = 0
+ while (i < numGroupingKey) {
output.setField(i, key.getField(i))
+ i += 1
}
- for (i <- 0 until numAggregates) {
+ i = 0
+ while (i < numAggregates) {
output.setField(numGroupingKey + i, record.getField(i))
+ i += 1
}
out.collect(output)
http://git-wip-us.apache.org/repos/asf/flink/blob/adbf846f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala
index 058b4a7..41f8e8c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala
@@ -52,18 +52,20 @@ class UnboundedProcessingOverProcessFunction(
ctx: ProcessFunction[Row, Row]#Context,
out: Collector[Row]): Unit = {
+ var i = 0
+
var accumulators = state.value()
if (null == accumulators) {
accumulators = new Row(aggregates.length)
- var i = 0
+ i = 0
while (i < aggregates.length) {
accumulators.setField(i, aggregates(i).createAccumulator())
i += 1
}
}
- var i = 0
+ i = 0
while (i < forwardedFieldCount) {
output.setField(i, input.getField(i))
i += 1