You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/03/08 17:07:11 UTC

flink git commit: [FLINK-5983] [table] Convert FOR into WHILE loops for aggregation functions.

Repository: flink
Updated Branches:
  refs/heads/master 31a57c5a8 -> adbf846f2


[FLINK-5983] [table] Convert FOR into WHILE loops for aggregation functions.

This closes #3489.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/adbf846f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/adbf846f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/adbf846f

Branch: refs/heads/master
Commit: adbf846f23881b98ab9dc5886a0b066b8aa1ded6
Parents: 31a57c5
Author: Fabian Hueske <fh...@apache.org>
Authored: Tue Mar 7 22:17:54 2017 +0100
Committer: twalthr <tw...@apache.org>
Committed: Wed Mar 8 17:59:31 2017 +0100

----------------------------------------------------------------------
 .../aggregate/AggregateAggFunction.scala        | 29 ++++++-----
 .../aggregate/AggregateMapFunction.scala        | 17 ++++---
 .../AggregateReduceCombineFunction.scala        | 16 ++++--
 .../AggregateReduceGroupFunction.scala          | 52 ++++++++++++--------
 ...SetSessionWindowAggReduceGroupFunction.scala | 36 +++++++++-----
 ...aSetSessionWindowAggregatePreProcessor.scala | 27 +++++++---
 ...umbleCountWindowAggReduceGroupFunction.scala | 32 +++++++-----
 ...mbleTimeWindowAggReduceCombineFunction.scala | 16 ++++--
 ...TumbleTimeWindowAggReduceGroupFunction.scala | 33 ++++++++-----
 .../aggregate/DataSetWindowAggMapFunction.scala | 23 +++++----
 .../IncrementalAggregateAllWindowFunction.scala |  2 +-
 .../IncrementalAggregateWindowFunction.scala    |  8 ++-
 ...UnboundedProcessingOverProcessFunction.scala |  6 ++-
 13 files changed, 193 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/adbf846f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
index 4d1579b..11d55e5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
@@ -36,43 +36,50 @@ class AggregateAggFunction(
     private val aggFields: Array[Int])
   extends DataStreamAggFunc[Row, Row, Row] {
 
-  val aggsWithIdx: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex
-
   override def createAccumulator(): Row = {
     val accumulatorRow: Row = new Row(aggregates.length)
-    aggsWithIdx.foreach { case (agg, i) =>
-      accumulatorRow.setField(i, agg.createAccumulator())
+    var i = 0
+    while (i < aggregates.length) {
+      accumulatorRow.setField(i, aggregates(i).createAccumulator())
+      i += 1
     }
     accumulatorRow
   }
 
-  override def add(value: Row, accumulatorRow: Row) = {
+  override def add(value: Row, accumulatorRow: Row): Unit = {
 
-    aggsWithIdx.foreach { case (agg, i) =>
+    var i = 0
+    while (i < aggregates.length) {
       val acc = accumulatorRow.getField(i).asInstanceOf[Accumulator]
       val v = value.getField(aggFields(i))
-      agg.accumulate(acc, v)
+      aggregates(i).accumulate(acc, v)
+      i += 1
     }
   }
 
   override def getResult(accumulatorRow: Row): Row = {
     val output = new Row(aggFields.length)
 
-    aggsWithIdx.foreach { case (agg, i) =>
-      output.setField(i, agg.getValue(accumulatorRow.getField(i).asInstanceOf[Accumulator]))
+    var i = 0
+    while (i < aggregates.length) {
+      val acc = accumulatorRow.getField(i).asInstanceOf[Accumulator]
+      output.setField(i, aggregates(i).getValue(acc))
+      i += 1
     }
     output
   }
 
   override def merge(aAccumulatorRow: Row, bAccumulatorRow: Row): Row = {
 
-    aggsWithIdx.foreach { case (agg, i) =>
+    var i = 0
+    while (i < aggregates.length) {
       val aAcc = aAccumulatorRow.getField(i).asInstanceOf[Accumulator]
       val bAcc = bAccumulatorRow.getField(i).asInstanceOf[Accumulator]
       val accumulators: JList[Accumulator] = new JArrayList[Accumulator]()
       accumulators.add(aAcc)
       accumulators.add(bAcc)
-      aAccumulatorRow.setField(i, agg.merge(accumulators))
+      aAccumulatorRow.setField(i, aggregates(i).merge(accumulators))
+      i += 1
     }
     aAccumulatorRow
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/adbf846f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateMapFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateMapFunction.scala
index d936fbb..ba1bb1d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateMapFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateMapFunction.scala
@@ -32,28 +32,33 @@ class AggregateMapFunction[IN, OUT](
     @transient private val returnType: TypeInformation[OUT])
   extends RichMapFunction[IN, OUT] with ResultTypeQueryable[OUT] {
 
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private val partialRowLength = groupingKeys.length + aggregates.length
   private var output: Row = _
 
   override def open(config: Configuration) {
-    Preconditions.checkNotNull(aggregates)
-    Preconditions.checkNotNull(aggFields)
-    Preconditions.checkArgument(aggregates.length == aggFields.length)
-    val partialRowLength = groupingKeys.length + aggregates.length
     output = new Row(partialRowLength)
   }
 
   override def map(value: IN): OUT = {
 
     val input = value.asInstanceOf[Row]
-    for (i <- aggregates.indices) {
+    var i = 0
+    while (i < aggregates.length) {
       val agg = aggregates(i)
       val accumulator = agg.createAccumulator()
       agg.accumulate(accumulator, input.getField(aggFields(i)))
       output.setField(groupingKeys.length + i, accumulator)
+      i += 1
     }
 
-    for (i <- groupingKeys.indices) {
+    i = 0
+    while (i < groupingKeys.length) {
       output.setField(i, input.getField(groupingKeys(i)))
+      i += 1
     }
     output.asInstanceOf[OUT]
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/adbf846f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala
index 6b95cb8..33ededa 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala
@@ -73,15 +73,18 @@ class AggregateReduceCombineFunction(
     val iterator = records.iterator()
 
     // reset first accumulator in merge list
-    for (i <- aggregates.indices) {
+    var i = 0
+    while (i < aggregates.length) {
       val accumulator = aggregates(i).createAccumulator()
       accumulatorList(i).set(0, accumulator)
+      i += 1
     }
 
     while (iterator.hasNext) {
       val record = iterator.next()
 
-      for (i <- aggregates.indices) {
+      i = 0
+      while (i < aggregates.length) {
         // insert received accumulator into acc list
         val newAcc = record.getField(groupKeysMapping.length + i).asInstanceOf[Accumulator]
         accumulatorList(i).set(1, newAcc)
@@ -89,19 +92,24 @@ class AggregateReduceCombineFunction(
         val retAcc = aggregates(i).merge(accumulatorList(i))
         // insert result into acc list
         accumulatorList(i).set(0, retAcc)
+        i += 1
       }
 
       last = record
     }
 
     // set the partial merged result to the aggregateBuffer
-    for (i <- aggregates.indices) {
+    i = 0
+    while (i < aggregates.length) {
       preAggOutput.setField(groupKeysMapping.length + i, accumulatorList(i).get(0))
+      i += 1
     }
 
     // set group keys to aggregateBuffer.
-    for (i <- groupKeysMapping.indices) {
+    i = 0
+    while (i < groupKeysMapping.length) {
       preAggOutput.setField(i, last.getField(i))
+      i += 1
     }
 
     preAggOutput

http://git-wip-us.apache.org/repos/asf/flink/blob/adbf846f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala
index 2f75cd7..337f1dd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala
@@ -48,26 +48,27 @@ class AggregateReduceGroupFunction(
     private val finalRowArity: Int)
   extends RichGroupReduceFunction[Row, Row] {
 
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupKeysMapping)
+
   private var output: Row = _
-  private var intermediateGroupKeys: Option[Array[Int]] = None
+  private val intermediateGroupKeys: Option[Array[Int]] =
+    if (groupingSetsMapping.nonEmpty) { Some(groupKeysMapping.map(_._1)) } else { None }
 
   val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) {
     new JArrayList[Accumulator](2)
   }
 
   override def open(config: Configuration) {
-    Preconditions.checkNotNull(aggregates)
-    Preconditions.checkNotNull(groupKeysMapping)
     output = new Row(finalRowArity)
-    if (!groupingSetsMapping.isEmpty) {
-      intermediateGroupKeys = Some(groupKeysMapping.map(_._1))
-    }
 
     // init lists with two empty accumulators
-    for (i <- aggregates.indices) {
+    var i = 0
+    while (i < aggregates.length) {
       val accumulator = aggregates(i).createAccumulator()
       accumulatorList(i).add(accumulator)
       accumulatorList(i).add(accumulator)
+      i += 1
     }
   }
 
@@ -86,15 +87,18 @@ class AggregateReduceGroupFunction(
     val iterator = records.iterator()
 
     // reset first accumulator in merge list
-    for (i <- aggregates.indices) {
+    var i = 0
+    while (i < aggregates.length) {
       val accumulator = aggregates(i).createAccumulator()
       accumulatorList(i).set(0, accumulator)
+      i += 1
     }
 
     while (iterator.hasNext) {
       val record = iterator.next()
 
-      for (i <- aggregates.indices) {
+      i = 0
+      while (i < aggregates.length) {
         // insert received accumulator into acc list
         val newAcc = record.getField(groupKeysMapping.length + i).asInstanceOf[Accumulator]
         accumulatorList(i).set(1, newAcc)
@@ -102,31 +106,37 @@ class AggregateReduceGroupFunction(
         val retAcc = aggregates(i).merge(accumulatorList(i))
         // insert result into acc list
         accumulatorList(i).set(0, retAcc)
+        i += 1
       }
 
       last = record
     }
 
     // Set group keys value to final output.
-    groupKeysMapping.foreach {
-      case (after, previous) =>
-        output.setField(after, last.getField(previous))
+    i = 0
+    while (i < groupKeysMapping.length) {
+      val (after, previous) = groupKeysMapping(i)
+      output.setField(after, last.getField(previous))
+      i += 1
     }
 
     // get final aggregate value and set to output.
-    aggregateMapping.foreach {
-      case (after, previous) => {
-        val agg = aggregates(previous)
-        val result = agg.getValue(accumulatorList(previous).get(0))
-        output.setField(after, result)
-      }
+    i = 0
+    while (i < aggregateMapping.length) {
+      val (after, previous) = aggregateMapping(i)
+      val agg = aggregates(previous)
+      val result = agg.getValue(accumulatorList(previous).get(0))
+      output.setField(after, result)
+      i += 1
     }
 
     // Evaluate additional values of grouping sets
     if (intermediateGroupKeys.isDefined) {
-      groupingSetsMapping.foreach {
-        case (inputIndex, outputIndex) =>
-          output.setField(outputIndex, !intermediateGroupKeys.get.contains(inputIndex))
+      i = 0
+      while (i < groupingSetsMapping.length) {
+        val (inputIndex, outputIndex) = groupingSetsMapping(i)
+        output.setField(outputIndex, !intermediateGroupKeys.get.contains(inputIndex))
+        i += 1
       }
     }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/adbf846f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
index 1f19687..9c962ea 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
@@ -61,6 +61,9 @@ class DataSetSessionWindowAggReduceGroupFunction(
     isInputCombined: Boolean)
   extends RichGroupReduceFunction[Row, Row] {
 
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupKeysMapping)
+
   private var aggregateBuffer: Row = _
   private var output: Row = _
   private var collector: TimeWindowPropertyCollector = _
@@ -74,8 +77,6 @@ class DataSetSessionWindowAggReduceGroupFunction(
   }
 
   override def open(config: Configuration) {
-    Preconditions.checkNotNull(aggregates)
-    Preconditions.checkNotNull(groupKeysMapping)
     aggregateBuffer = new Row(intermediateRowArity)
     output = new Row(finalRowArity)
     collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos)
@@ -104,10 +105,13 @@ class DataSetSessionWindowAggReduceGroupFunction(
     var windowEnd: java.lang.Long = null
     var currentRowTime: java.lang.Long = null
 
+
     // reset first accumulator in merge list
-    for (i <- aggregates.indices) {
+    var i = 0
+    while (i < aggregates.length) {
       val accumulator = aggregates(i).createAccumulator()
       accumulatorList(i).set(0, accumulator)
+      i += 1
     }
 
     val iterator = records.iterator()
@@ -125,22 +129,27 @@ class DataSetSessionWindowAggReduceGroupFunction(
           doEvaluateAndCollect(out, accumulatorList, windowStart, windowEnd)
 
           // reset first accumulator in list
-          for (i <- aggregates.indices) {
+          i = 0
+          while (i < aggregates.length) {
             val accumulator = aggregates(i).createAccumulator()
             accumulatorList(i).set(0, accumulator)
+            i += 1
           }
         } else {
           // set group keys value to final output.
-          groupKeysMapping.foreach {
-            case (after, previous) =>
-              output.setField(after, record.getField(previous))
+          i = 0
+          while (i < groupKeysMapping.length) {
+            val (after, previous) = groupKeysMapping(i)
+            output.setField(after, record.getField(previous))
+            i += 1
           }
         }
 
         windowStart = record.getField(intermediateRowWindowStartPos).asInstanceOf[Long]
       }
 
-      for (i <- aggregates.indices) {
+      i = 0
+      while (i < aggregates.length) {
         // insert received accumulator into acc list
         val newAcc = record.getField(accumStartPos + i).asInstanceOf[Accumulator]
         accumulatorList(i).set(1, newAcc)
@@ -148,6 +157,7 @@ class DataSetSessionWindowAggReduceGroupFunction(
         val retAcc = aggregates(i).merge(accumulatorList(i))
         // insert result into acc list
         accumulatorList(i).set(0, retAcc)
+        i += 1
       }
 
       windowEnd = if (isInputCombined) {
@@ -180,10 +190,12 @@ class DataSetSessionWindowAggReduceGroupFunction(
       windowEnd: Long): Unit = {
 
     // merge the accumulators and then get value for the final output
-    aggregateMapping.foreach {
-      case (after, previous) =>
-        val agg = aggregates(previous)
-        output.setField(after, agg.getValue(accumulatorList(previous).get(0)))
+    var i = 0
+    while (i < aggregateMapping.length) {
+      val (after, previous) = aggregateMapping(i)
+      val agg = aggregates(previous)
+      output.setField(after, agg.getValue(accumulatorList(previous).get(0)))
+      i += 1
     }
 
     // adds TimeWindow properties to output then emit output

http://git-wip-us.apache.org/repos/asf/flink/blob/adbf846f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
index a299c40..b99c83e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.runtime.aggregate
 import java.lang.Iterable
 import java.util.{ArrayList => JArrayList}
 
-import org.apache.flink.api.common.functions.{AbstractRichFunction, GroupCombineFunction, MapPartitionFunction, RichGroupCombineFunction}
+import org.apache.flink.api.common.functions.{AbstractRichFunction, GroupCombineFunction, MapPartitionFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.types.Row
@@ -47,6 +47,9 @@ class DataSetSessionWindowAggregatePreProcessor(
   with GroupCombineFunction[Row,Row]
   with ResultTypeQueryable[Row] {
 
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupingKeys)
+
   private var aggregateBuffer: Row = _
   private val accumStartPos: Int = groupingKeys.length
   private val rowTimeFieldPos = accumStartPos + aggregates.length
@@ -56,8 +59,6 @@ class DataSetSessionWindowAggregatePreProcessor(
   }
 
   override def open(config: Configuration) {
-    Preconditions.checkNotNull(aggregates)
-    Preconditions.checkNotNull(groupingKeys)
     aggregateBuffer = new Row(rowTimeFieldPos + 2)
 
     // init lists with two empty accumulators
@@ -110,9 +111,11 @@ class DataSetSessionWindowAggregatePreProcessor(
     var currentRowTime: java.lang.Long = null
 
     // reset first accumulator in merge list
-    for (i <- aggregates.indices) {
+    var i = 0
+    while (i < aggregates.length) {
       val accumulator = aggregates(i).createAccumulator()
       accumulatorList(i).set(0, accumulator)
+      i += 1
     }
 
     val iterator = records.iterator()
@@ -129,21 +132,26 @@ class DataSetSessionWindowAggregatePreProcessor(
           doCollect(out, accumulatorList, windowStart, windowEnd)
 
           // reset first value of accumulator list
-          for (i <- aggregates.indices) {
+          i = 0
+          while (i < aggregates.length) {
             val accumulator = aggregates(i).createAccumulator()
             accumulatorList(i).set(0, accumulator)
+            i += 1
           }
         } else {
           // set group keys to aggregateBuffer.
-          for (i <- groupingKeys.indices) {
+          i = 0
+          while (i < groupingKeys.length) {
             aggregateBuffer.setField(i, record.getField(i))
+            i += 1
           }
         }
 
         windowStart = record.getField(rowTimeFieldPos).asInstanceOf[Long]
       }
 
-      for (i <- aggregates.indices) {
+      i = 0
+      while (i < aggregates.length) {
         // insert received accumulator into acc list
         val newAcc = record.getField(accumStartPos + i).asInstanceOf[Accumulator]
         accumulatorList(i).set(1, newAcc)
@@ -151,6 +159,7 @@ class DataSetSessionWindowAggregatePreProcessor(
         val retAcc = aggregates(i).merge(accumulatorList(i))
         // insert result into acc list
         accumulatorList(i).set(0, retAcc)
+        i += 1
       }
 
       // the current rowtime is the last rowtime of the next calculation.
@@ -178,8 +187,10 @@ class DataSetSessionWindowAggregatePreProcessor(
       windowEnd: Long): Unit = {
 
     // merge the accumulators into one accumulator
-    for (i <- aggregates.indices) {
+    var i = 0
+    while (i < aggregates.length) {
       aggregateBuffer.setField(accumStartPos + i, accumulatorList(i).get(0))
+      i += 1
     }
 
     // intermediate Row WindowStartPos is rowtime pos.

http://git-wip-us.apache.org/repos/asf/flink/blob/adbf846f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
index ecc945c..7022c65 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
@@ -47,6 +47,9 @@ class DataSetTumbleCountWindowAggReduceGroupFunction(
     private val finalRowArity: Int)
   extends RichGroupReduceFunction[Row, Row] {
 
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupKeysMapping)
+
   private var output: Row = _
   private val accumStartPos: Int = groupKeysMapping.length
 
@@ -55,8 +58,6 @@ class DataSetTumbleCountWindowAggReduceGroupFunction(
   }
 
   override def open(config: Configuration) {
-    Preconditions.checkNotNull(aggregates)
-    Preconditions.checkNotNull(groupKeysMapping)
     output = new Row(finalRowArity)
 
     // init lists with two empty accumulators
@@ -71,21 +72,25 @@ class DataSetTumbleCountWindowAggReduceGroupFunction(
 
     var count: Long = 0
     val iterator = records.iterator()
+    var i = 0
 
     while (iterator.hasNext) {
 
       if (count == 0) {
         // reset first accumulator
-        for (i <- aggregates.indices) {
+        i = 0
+        while (i < aggregates.length) {
           val accumulator = aggregates(i).createAccumulator()
           accumulatorList(i).set(0, accumulator)
+          i += 1
         }
       }
 
       val record = iterator.next()
       count += 1
 
-      for (i <- aggregates.indices) {
+      i = 0
+      while (i < aggregates.length) {
         // insert received accumulator into acc list
         val newAcc = record.getField(accumStartPos + i).asInstanceOf[Accumulator]
         accumulatorList(i).set(1, newAcc)
@@ -93,20 +98,25 @@ class DataSetTumbleCountWindowAggReduceGroupFunction(
         val retAcc = aggregates(i).merge(accumulatorList(i))
         // insert result into acc list
         accumulatorList(i).set(0, retAcc)
+        i += 1
       }
 
       if (windowSize == count) {
         // set group keys value to final output.
-        groupKeysMapping.foreach {
-          case (after, previous) =>
-            output.setField(after, record.getField(previous))
+        i = 0
+        while (i < groupKeysMapping.length) {
+          val (after, previous) = groupKeysMapping(i)
+          output.setField(after, record.getField(previous))
+          i += 1
         }
 
         // merge the accumulators and then get value for the final output
-        aggregateMapping.foreach {
-          case (after, previous) =>
-            val agg = aggregates(previous)
-            output.setField(after, agg.getValue(accumulatorList(previous).get(0)))
+        i = 0
+        while (i < aggregateMapping.length) {
+          val (after, previous) = aggregateMapping(i)
+          val agg = aggregates(previous)
+          output.setField(after, agg.getValue(accumulatorList(previous).get(0)))
+          i += 1
         }
 
         // emit the output

http://git-wip-us.apache.org/repos/asf/flink/blob/adbf846f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala
index df8bed9..c618325 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala
@@ -70,15 +70,18 @@ class DataSetTumbleTimeWindowAggReduceCombineFunction(
     val iterator = records.iterator()
 
     // reset first accumulator in merge list
-    for (i <- aggregates.indices) {
+    var i = 0
+    while (i < aggregates.length) {
       val accumulator = aggregates(i).createAccumulator()
       accumulatorList(i).set(0, accumulator)
+      i += 1
     }
 
     while (iterator.hasNext) {
       val record = iterator.next()
 
-      for (i <- aggregates.indices) {
+      i = 0
+      while (i < aggregates.length) {
         // insert received accumulator into acc list
         val newAcc = record.getField(groupKeysMapping.length + i).asInstanceOf[Accumulator]
         accumulatorList(i).set(1, newAcc)
@@ -86,19 +89,24 @@ class DataSetTumbleTimeWindowAggReduceCombineFunction(
         val retAcc = aggregates(i).merge(accumulatorList(i))
         // insert result into acc list
         accumulatorList(i).set(0, retAcc)
+        i += 1
       }
 
       last = record
     }
 
     // set the partial merged result to the aggregateBuffer
-    for (i <- aggregates.indices) {
+    i = 0
+    while (i < aggregates.length) {
       aggregateBuffer.setField(groupKeysMapping.length + i, accumulatorList(i).get(0))
+      i += 1
     }
 
     // set group keys to aggregateBuffer.
-    for (i <- groupKeysMapping.indices) {
+    i = 0
+    while (i < groupKeysMapping.length) {
       aggregateBuffer.setField(i, last.getField(i))
+      i += 1
     }
 
     // set the rowtime attribute

http://git-wip-us.apache.org/repos/asf/flink/blob/adbf846f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
index 674c078..f85f2cd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
@@ -51,6 +51,9 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
     finalRowArity: Int)
   extends RichGroupReduceFunction[Row, Row] {
 
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupKeysMapping)
+
   private var collector: TimeWindowPropertyCollector = _
   protected var aggregateBuffer: Row = _
   private var output: Row = _
@@ -64,8 +67,6 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
   }
 
   override def open(config: Configuration) {
-    Preconditions.checkNotNull(aggregates)
-    Preconditions.checkNotNull(groupKeysMapping)
     aggregateBuffer = new Row(intermediateRowArity)
     output = new Row(finalRowArity)
     collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos)
@@ -84,15 +85,18 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
     val iterator = records.iterator()
 
     // reset first accumulator in merge list
-    for (i <- aggregates.indices) {
+    var i = 0
+    while (i < aggregates.length) {
       val accumulator = aggregates(i).createAccumulator()
       accumulatorList(i).set(0, accumulator)
+      i += 1
     }
 
     while (iterator.hasNext) {
       val record = iterator.next()
 
-      for (i <- aggregates.indices) {
+      i = 0
+      while (i < aggregates.length) {
         // insert received accumulator into acc list
         val newAcc = record.getField(groupKeysMapping.length + i).asInstanceOf[Accumulator]
         accumulatorList(i).set(1, newAcc)
@@ -100,23 +104,28 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
         val retAcc = aggregates(i).merge(accumulatorList(i))
         // insert result into acc list
         accumulatorList(i).set(0, retAcc)
+        i += 1
       }
 
       last = record
     }
 
     // set group keys value to final output.
-    groupKeysMapping.foreach {
-      case (after, previous) =>
-        output.setField(after, last.getField(previous))
+    i = 0
+    while (i < groupKeysMapping.length) {
+      val (after, previous) = groupKeysMapping(i)
+      output.setField(after, last.getField(previous))
+      i += 1
     }
 
     // get final aggregate value and set to output.
-    aggregateMapping.foreach {
-      case (after, previous) =>
-        val agg = aggregates(previous)
-        val result = agg.getValue(accumulatorList(previous).get(0))
-        output.setField(after, result)
+    i = 0
+    while (i < aggregateMapping.length) {
+      val (after, previous) = aggregateMapping(i)
+      val agg = aggregates(previous)
+      val result = agg.getValue(accumulatorList(previous).get(0))
+      output.setField(after, result)
+      i += 1
     }
 
     // get window start timestamp

http://git-wip-us.apache.org/repos/asf/flink/blob/adbf846f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala
index 4a64c47..4707a4a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala
@@ -44,33 +44,36 @@ class DataSetWindowAggMapFunction(
     @transient private val returnType: TypeInformation[Row])
   extends RichMapFunction[Row, Row] with ResultTypeQueryable[Row] {
 
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
   private var output: Row = _
+  // add one more arity to store rowtime
+  private val partialRowLength = groupingKeys.length + aggregates.length + 1
   // rowtime index in the buffer output row
-  private var rowtimeIndex: Int = _
+  private val rowtimeIndex: Int = partialRowLength - 1
 
   override def open(config: Configuration) {
-    Preconditions.checkNotNull(aggregates)
-    Preconditions.checkNotNull(aggFields)
-    Preconditions.checkArgument(aggregates.length == aggFields.length)
-    // add one more arity to store rowtime
-    val partialRowLength = groupingKeys.length + aggregates.length + 1
-    // set rowtime to the last field of the output row
-    rowtimeIndex = partialRowLength - 1
     output = new Row(partialRowLength)
   }
 
   override def map(input: Row): Row = {
 
-    for (i <- aggregates.indices) {
+    var i = 0
+    while (i < aggregates.length) {
       val agg = aggregates(i)
       val fieldValue = input.getField(aggFields(i))
       val accumulator = agg.createAccumulator()
       agg.accumulate(accumulator, fieldValue)
       output.setField(groupingKeys.length + i, accumulator)
+      i += 1
     }
 
-    for (i <- groupingKeys.indices) {
+    i = 0
+    while (i < groupingKeys.length) {
       output.setField(i, input.getField(groupingKeys(i)))
+      i += 1
     }
 
     val timeField = input.getField(timeFieldPos)

http://git-wip-us.apache.org/repos/asf/flink/blob/adbf846f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
index 13ac6a9..bfab00b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
@@ -23,7 +23,7 @@ import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
 import org.apache.flink.streaming.api.windowing.windows.Window
-import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.util.Collector
 
 /**
   * Computes the final aggregate value from incrementally computed aggreagtes.

http://git-wip-us.apache.org/repos/asf/flink/blob/adbf846f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
index a4d4837..983efb3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
@@ -60,11 +60,15 @@ class IncrementalAggregateWindowFunction[W <: Window](
     if (iterator.hasNext) {
       val record = iterator.next()
 
-      for (i <- 0 until numGroupingKey) {
+      var i = 0
+      while (i < numGroupingKey) {
         output.setField(i, key.getField(i))
+        i += 1
       }
-      for (i <- 0 until numAggregates) {
+      i = 0
+      while (i < numAggregates) {
         output.setField(numGroupingKey + i, record.getField(i))
+        i += 1
       }
 
       out.collect(output)

http://git-wip-us.apache.org/repos/asf/flink/blob/adbf846f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala
index 058b4a7..41f8e8c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala
@@ -52,18 +52,20 @@ class UnboundedProcessingOverProcessFunction(
     ctx: ProcessFunction[Row, Row]#Context,
     out: Collector[Row]): Unit = {
 
+    var i = 0
+
     var accumulators = state.value()
 
     if (null == accumulators) {
       accumulators = new Row(aggregates.length)
-      var i = 0
+      i = 0
       while (i < aggregates.length) {
         accumulators.setField(i, aggregates(i).createAccumulator())
         i += 1
       }
     }
 
-    var i = 0
+    i = 0
     while (i < forwardedFieldCount) {
       output.setField(i, input.getField(i))
       i += 1