You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/04/06 19:29:14 UTC

[07/12] flink git commit: [FLINK-6257] [table] Consistent naming of ProcessFunction and methods for OVER windows.

[FLINK-6257] [table] Consistent naming of ProcessFunction and methods for OVER windows.

- Add check for sort order of OVER windows.

This closes #3681.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/07f1b035
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/07f1b035
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/07f1b035

Branch: refs/heads/table-retraction
Commit: 07f1b035ffbf07d160503c48e2c58a464ec5d014
Parents: 5ff9c99
Author: sunjincheng121 <su...@gmail.com>
Authored: Thu Apr 6 10:33:30 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Apr 6 16:33:45 2017 +0200

----------------------------------------------------------------------
 .../datastream/DataStreamOverAggregate.scala    | 196 +++++--------
 .../table/runtime/aggregate/AggregateUtil.scala | 122 +++-----
 ...ndedProcessingOverRangeProcessFunction.scala | 183 ------------
 ...oundedProcessingOverRowProcessFunction.scala | 179 ------------
 .../aggregate/ProcTimeBoundedRangeOver.scala    | 182 ++++++++++++
 .../aggregate/ProcTimeBoundedRowsOver.scala     | 179 ++++++++++++
 .../ProcTimeUnboundedNonPartitionedOver.scala   |  96 ++++++
 .../ProcTimeUnboundedPartitionedOver.scala      |  84 ++++++
 .../RangeClauseBoundedOverProcessFunction.scala | 201 -------------
 .../aggregate/RowTimeBoundedRangeOver.scala     | 200 +++++++++++++
 .../aggregate/RowTimeBoundedRowsOver.scala      | 222 ++++++++++++++
 .../aggregate/RowTimeUnboundedOver.scala        | 292 +++++++++++++++++++
 .../RowsClauseBoundedOverProcessFunction.scala  | 222 --------------
 .../UnboundedEventTimeOverProcessFunction.scala | 292 -------------------
 ...rtitionedProcessingOverProcessFunction.scala |  96 ------
 ...UnboundedProcessingOverProcessFunction.scala |  84 ------
 ...ProcessingOverRangeProcessFunctionTest.scala |   2 +-
 17 files changed, 1380 insertions(+), 1452 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/07f1b035/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index 947775b..2224752 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -17,20 +17,21 @@
  */
 package org.apache.flink.table.plan.nodes.datastream
 
+import java.util.{List => JList}
+
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.core.{AggregateCall, Window}
+import org.apache.calcite.rel.core.Window.Group
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.runtime.aggregate._
 import org.apache.flink.table.plan.nodes.OverAggregate
+import org.apache.flink.table.runtime.aggregate._
 import org.apache.flink.types.Row
-import org.apache.calcite.rel.core.Window
-import org.apache.calcite.rel.core.Window.Group
-import java.util.{List => JList}
 
 import org.apache.flink.api.java.functions.NullByteKeySelector
 import org.apache.flink.table.codegen.CodeGenerator
@@ -90,12 +91,20 @@ class DataStreamOverAggregate(
 
     val overWindow: org.apache.calcite.rel.core.Window.Group = logicWindow.groups.get(0)
 
-    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+    val orderKeys = overWindow.orderKeys.getFieldCollations
 
-    if (overWindow.orderKeys.getFieldCollations.size() != 1) {
+    if (orderKeys.size() != 1) {
       throw new TableException(
-        "Unsupported use of OVER windows. The window may only be ordered by a single time column.")
+        "Unsupported use of OVER windows. The window can only be ordered by a single time column.")
     }
+    val orderKey = orderKeys.get(0)
+
+    if (!orderKey.direction.equals(ASCENDING)) {
+      throw new TableException(
+        "Unsupported use of OVER windows. The window can only be ordered in ASCENDING mode.")
+    }
+
+    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
 
     val generator = new CodeGenerator(
       tableEnv.getConfig,
@@ -104,78 +113,69 @@ class DataStreamOverAggregate(
 
     val timeType = inputType
       .getFieldList
-      .get(overWindow.orderKeys.getFieldCollations.get(0).getFieldIndex)
+      .get(orderKey.getFieldIndex)
       .getValue
+
     timeType match {
       case _: ProcTimeType =>
         // proc-time OVER window
         if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
-          // unbounded preceding OVER window
-          createUnboundedAndCurrentRowProcessingTimeOverWindow(
+          // unbounded OVER window
+          createUnboundedAndCurrentRowOverWindow(
             generator,
-            inputDS)
+            inputDS,
+            isRowTimeType = false,
+            isRowsClause = overWindow.isRows)
         } else if (
           overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded &&
-              overWindow.upperBound.isCurrentRow) {
+            overWindow.upperBound.isCurrentRow) {
           // bounded OVER window
-          if (overWindow.isRows) {
-            // ROWS clause bounded OVER window
-            createBoundedAndCurrentRowOverWindow(
-              generator,
-              inputDS,
-              isRangeClause = false,
-              isRowTimeType = false)
-          } else {
-            // RANGE clause bounded OVER window
-            createBoundedAndCurrentRowOverWindow(
-              generator,
-              inputDS,
-              isRangeClause = true,
-              isRowTimeType = false)
-          }
+          createBoundedAndCurrentRowOverWindow(
+            generator,
+            inputDS,
+            isRowTimeType = false,
+            isRowsClause = overWindow.isRows
+          )
         } else {
           throw new TableException(
-            "processing-time OVER RANGE FOLLOWING window is not supported yet.")
+            "OVER RANGE FOLLOWING windows are not supported yet.")
         }
       case _: RowTimeType =>
         // row-time OVER window
         if (overWindow.lowerBound.isPreceding &&
-              overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
-          // ROWS/RANGE clause unbounded OVER window
-          createUnboundedAndCurrentRowEventTimeOverWindow(
+          overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
+          // unbounded OVER window
+          createUnboundedAndCurrentRowOverWindow(
             generator,
             inputDS,
-            overWindow.isRows)
+            isRowTimeType = true,
+            isRowsClause = overWindow.isRows
+          )
         } else if (overWindow.lowerBound.isPreceding && overWindow.upperBound.isCurrentRow) {
           // bounded OVER window
-          if (overWindow.isRows) {
-            // ROWS clause bounded OVER window
-            createBoundedAndCurrentRowOverWindow(
-              generator,
-              inputDS,
-              isRangeClause = false,
-              isRowTimeType = true)
-          } else {
-            // RANGE clause bounded OVER window
-            createBoundedAndCurrentRowOverWindow(
-              generator,
-              inputDS,
-              isRangeClause = true,
-              isRowTimeType = true)
-          }
+          createBoundedAndCurrentRowOverWindow(
+            generator,
+            inputDS,
+            isRowTimeType = true,
+            isRowsClause = overWindow.isRows
+            )
         } else {
           throw new TableException(
-            "row-time OVER RANGE FOLLOWING window is not supported yet.")
+            "OVER RANGE FOLLOWING windows are not supported yet.")
         }
       case _ =>
-        throw new TableException(s"Unsupported time type {$timeType}")
+        throw new TableException(
+          "Unsupported time type {$timeType}. " +
+            "OVER windows do only support RowTimeType and ProcTimeType.")
     }
 
   }
 
-  def createUnboundedAndCurrentRowProcessingTimeOverWindow(
+  def createUnboundedAndCurrentRowOverWindow(
     generator: CodeGenerator,
-    inputDS: DataStream[Row]): DataStream[Row] = {
+    inputDS: DataStream[Row],
+    isRowTimeType: Boolean,
+    isRowsClause: Boolean): DataStream[Row] = {
 
     val overWindow: Group = logicWindow.groups.get(0)
     val partitionKeys: Array[Int] = overWindow.keys.toArray
@@ -184,14 +184,17 @@ class DataStreamOverAggregate(
     // get the output types
     val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
 
+    val processFunction = AggregateUtil.createUnboundedOverProcessFunction(
+      generator,
+      namedAggregates,
+      inputType,
+      isRowTimeType,
+      partitionKeys.nonEmpty,
+      isRowsClause)
+
     val result: DataStream[Row] =
     // partitioned aggregation
       if (partitionKeys.nonEmpty) {
-        val processFunction = AggregateUtil.createUnboundedProcessingOverProcessFunction(
-          generator,
-          namedAggregates,
-          inputType)
-
         inputDS
           .keyBy(partitionKeys: _*)
           .process(processFunction)
@@ -201,17 +204,19 @@ class DataStreamOverAggregate(
       }
       // non-partitioned aggregation
       else {
-        val processFunction = AggregateUtil.createUnboundedProcessingOverProcessFunction(
-          generator,
-          namedAggregates,
-          inputType,
-          isPartitioned = false)
-
-        inputDS
-          .process(processFunction).setParallelism(1).setMaxParallelism(1)
-          .returns(rowTypeInfo)
-          .name(aggOpName)
-          .asInstanceOf[DataStream[Row]]
+        if (isRowTimeType) {
+          inputDS.keyBy(new NullByteKeySelector[Row])
+            .process(processFunction).setParallelism(1).setMaxParallelism(1)
+            .returns(rowTypeInfo)
+            .name(aggOpName)
+            .asInstanceOf[DataStream[Row]]
+        } else {
+          inputDS
+            .process(processFunction).setParallelism(1).setMaxParallelism(1)
+            .returns(rowTypeInfo)
+            .name(aggOpName)
+            .asInstanceOf[DataStream[Row]]
+        }
       }
     result
   }
@@ -219,15 +224,15 @@ class DataStreamOverAggregate(
   def createBoundedAndCurrentRowOverWindow(
     generator: CodeGenerator,
     inputDS: DataStream[Row],
-    isRangeClause: Boolean,
-    isRowTimeType: Boolean): DataStream[Row] = {
+    isRowTimeType: Boolean,
+    isRowsClause: Boolean): DataStream[Row] = {
 
     val overWindow: Group = logicWindow.groups.get(0)
     val partitionKeys: Array[Int] = overWindow.keys.toArray
     val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates
 
     val precedingOffset =
-      getLowerBoundary(logicWindow, overWindow, getInput()) + (if (isRangeClause) 0 else 1)
+      getLowerBoundary(logicWindow, overWindow, getInput()) + (if (isRowsClause) 1 else 0)
 
     // get the output types
     val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
@@ -237,8 +242,9 @@ class DataStreamOverAggregate(
       namedAggregates,
       inputType,
       precedingOffset,
-      isRangeClause,
-      isRowTimeType)
+      isRowsClause,
+      isRowTimeType
+    )
     val result: DataStream[Row] =
     // partitioned aggregation
       if (partitionKeys.nonEmpty) {
@@ -253,49 +259,7 @@ class DataStreamOverAggregate(
       else {
         inputDS
           .keyBy(new NullByteKeySelector[Row])
-          .process(processFunction)
-          .setParallelism(1)
-          .setMaxParallelism(1)
-          .returns(rowTypeInfo)
-          .name(aggOpName)
-          .asInstanceOf[DataStream[Row]]
-      }
-    result
-  }
-
-  def createUnboundedAndCurrentRowEventTimeOverWindow(
-    generator: CodeGenerator,
-    inputDS: DataStream[Row],
-    isRows: Boolean): DataStream[Row] = {
-
-    val overWindow: Group = logicWindow.groups.get(0)
-    val partitionKeys: Array[Int] = overWindow.keys.toArray
-    val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates
-
-    // get the output types
-    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
-
-    val processFunction = AggregateUtil.createUnboundedEventTimeOverProcessFunction(
-      generator,
-      namedAggregates,
-      inputType,
-      isRows)
-
-    val result: DataStream[Row] =
-      // partitioned aggregation
-      if (partitionKeys.nonEmpty) {
-        inputDS.keyBy(partitionKeys: _*)
-          .process(processFunction)
-          .returns(rowTypeInfo)
-          .name(aggOpName)
-          .asInstanceOf[DataStream[Row]]
-      }
-      // global non-partitioned aggregation
-      else {
-        inputDS.keyBy(new NullByteKeySelector[Row])
-          .process(processFunction)
-          .setParallelism(1)
-          .setMaxParallelism(1)
+          .process(processFunction).setParallelism(1).setMaxParallelism(1)
           .returns(rowTypeInfo)
           .name(aggOpName)
           .asInstanceOf[DataStream[Row]]

http://git-wip-us.apache.org/repos/asf/flink/blob/07f1b035/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index fc03ac1..09d1a13 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -55,21 +55,23 @@ object AggregateUtil {
   type JavaList[T] = java.util.List[T]
 
   /**
-    * Create an [[org.apache.flink.streaming.api.functions.ProcessFunction]] to evaluate final
-    * aggregate value.
+    * Create an [[org.apache.flink.streaming.api.functions.ProcessFunction]] for unbounded OVER
+    * window to evaluate final aggregate value.
     *
     * @param generator       code generator instance
     * @param namedAggregates List of calls to aggregate functions and their output field names
-    * @param inputType       Input row type
-    * @param isPartitioned   Flag to indicate whether the input is partitioned or not
-    *
-    * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]]
+    * @param inputType Input row type
+    * @param isRowTimeType It is a tag that indicates whether the time type is rowTimeType
+    * @param isPartitioned It is a tag that indicate whether the input is partitioned
+    * @param isRowsClause It is a tag that indicates whether the OVER clause is ROWS clause
     */
-  private[flink] def createUnboundedProcessingOverProcessFunction(
-      generator: CodeGenerator,
-      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-      inputType: RelDataType,
-      isPartitioned: Boolean = true): ProcessFunction[Row, Row] = {
+  private[flink] def createUnboundedOverProcessFunction(
+    generator: CodeGenerator,
+    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+    inputType: RelDataType,
+    isRowTimeType: Boolean,
+    isPartitioned: Boolean,
+    isRowsClause: Boolean): ProcessFunction[Row, Row] = {
 
     val (aggFields, aggregates) =
       transformToAggregateFunctions(
@@ -95,14 +97,30 @@ object AggregateUtil {
       outputArity
     )
 
-    if (isPartitioned) {
-      new UnboundedProcessingOverProcessFunction(
-        genFunction,
-        aggregationStateType)
+    if (isRowTimeType) {
+      if (isRowsClause) {
+        // ROWS unbounded over process function
+        new RowTimeUnboundedRowsOver(
+          genFunction,
+          aggregationStateType,
+          FlinkTypeFactory.toInternalRowTypeInfo(inputType))
+      } else {
+        // RANGE unbounded over process function
+        new RowTimeUnboundedRangeOver(
+          genFunction,
+          aggregationStateType,
+          FlinkTypeFactory.toInternalRowTypeInfo(inputType))
+      }
     } else {
-      new UnboundedNonPartitionedProcessingOverProcessFunction(
-        genFunction,
-        aggregationStateType)
+      if (isPartitioned) {
+        new ProcTimeUnboundedPartitionedOver(
+          genFunction,
+          aggregationStateType)
+      } else {
+        new ProcTimeUnboundedNonPartitionedOver(
+          genFunction,
+          aggregationStateType)
+      }
     }
   }
 
@@ -114,7 +132,7 @@ object AggregateUtil {
     * @param namedAggregates List of calls to aggregate functions and their output field names
     * @param inputType       Input row type
     * @param precedingOffset the preceding offset
-    * @param isRangeClause   It is a tag that indicates whether the OVER clause is rangeClause
+    * @param isRowsClause   It is a tag that indicates whether the OVER clause is ROWS clause
     * @param isRowTimeType   It is a tag that indicates whether the time type is rowTimeType
     * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]]
     */
@@ -123,7 +141,7 @@ object AggregateUtil {
     namedAggregates: Seq[CalcitePair[AggregateCall, String]],
     inputType: RelDataType,
     precedingOffset: Long,
-    isRangeClause: Boolean,
+    isRowsClause: Boolean,
     isRowTimeType: Boolean): ProcessFunction[Row, Row] = {
 
     val (aggFields, aggregates) =
@@ -151,15 +169,15 @@ object AggregateUtil {
     )
 
     if (isRowTimeType) {
-      if (isRangeClause) {
-        new RangeClauseBoundedOverProcessFunction(
+      if (isRowsClause) {
+        new RowTimeBoundedRowsOver(
           genFunction,
           aggregationStateType,
           inputRowType,
           precedingOffset
         )
       } else {
-        new RowsClauseBoundedOverProcessFunction(
+        new RowTimeBoundedRangeOver(
           genFunction,
           aggregationStateType,
           inputRowType,
@@ -167,14 +185,14 @@ object AggregateUtil {
         )
       }
     } else {
-      if (isRangeClause) {
-        new BoundedProcessingOverRangeProcessFunction(
+      if (isRowsClause) {
+        new ProcTimeBoundedRowsOver(
           genFunction,
           precedingOffset,
           aggregationStateType,
           inputRowType)
       } else {
-        new BoundedProcessingOverRowProcessFunction(
+        new ProcTimeBoundedRangeOver(
           genFunction,
           precedingOffset,
           aggregationStateType,
@@ -183,58 +201,6 @@ object AggregateUtil {
     }
   }
 
-  /**
-    * Create an [[ProcessFunction]] to evaluate final aggregate value.
-    *
-    * @param generator       code generator instance
-    * @param namedAggregates List of calls to aggregate functions and their output field names
-    * @param inputType       Input row type
-    * @param isRows          Flag to indicate if whether this is a Row (true) or a Range (false)
-    *                        over window process
-    * @return [[UnboundedEventTimeOverProcessFunction]]
-    */
-  private[flink] def createUnboundedEventTimeOverProcessFunction(
-      generator: CodeGenerator,
-      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-      inputType: RelDataType,
-      isRows: Boolean): UnboundedEventTimeOverProcessFunction = {
-
-    val (aggFields, aggregates) =
-      transformToAggregateFunctions(
-        namedAggregates.map(_.getKey),
-        inputType,
-        needRetraction = false)
-
-    val aggregationStateType: RowTypeInfo = createAccumulatorRowType(aggregates)
-
-    val forwardMapping = (0 until inputType.getFieldCount).map(x => (x, x)).toArray
-    val aggMapping = aggregates.indices.map(x => x + inputType.getFieldCount).toArray
-    val outputArity = inputType.getFieldCount + aggregates.length
-
-    val genFunction = generator.generateAggregations(
-      "UnboundedEventTimeOverAggregateHelper",
-      generator,
-      inputType,
-      aggregates,
-      aggFields,
-      aggMapping,
-      forwardMapping,
-      outputArity)
-
-    if (isRows) {
-      // ROWS unbounded over process function
-      new UnboundedEventTimeRowsOverProcessFunction(
-        genFunction,
-        aggregationStateType,
-        FlinkTypeFactory.toInternalRowTypeInfo(inputType))
-    } else {
-      // RANGE unbounded over process function
-      new UnboundedEventTimeRangeOverProcessFunction(
-        genFunction,
-        aggregationStateType,
-        FlinkTypeFactory.toInternalRowTypeInfo(inputType))
-    }
-  }
 
   /**
     * Create a [[org.apache.flink.api.common.functions.MapFunction]] that prepares for aggregates.

http://git-wip-us.apache.org/repos/asf/flink/blob/07f1b035/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunction.scala
deleted file mode 100644
index 8f3aa3e..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunction.scala
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.aggregate
-
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.types.Row
-import org.apache.flink.util.Collector
-import org.apache.flink.api.common.state.ValueState
-import org.apache.flink.api.common.state.ValueStateDescriptor
-import org.apache.flink.api.common.state.MapState
-import org.apache.flink.api.common.state.MapStateDescriptor
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.ListTypeInfo
-import java.util.{ArrayList, List => JList}
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
-import org.slf4j.LoggerFactory
-
-/**
-  * Process Function used for the aggregate in bounded proc-time OVER window
-  * [[org.apache.flink.streaming.api.datastream.DataStream]]
-  *
-  * @param genAggregations Generated aggregate helper function
-  * @param precedingTimeBoundary    Is used to indicate the processing time boundaries
-  * @param aggregatesTypeInfo       row type info of aggregation
-  * @param inputType                row type info of input row
-  */
-class BoundedProcessingOverRangeProcessFunction(
-    genAggregations: GeneratedAggregationsFunction,
-    precedingTimeBoundary: Long,
-    aggregatesTypeInfo: RowTypeInfo,
-    inputType: TypeInformation[Row])
-  extends ProcessFunction[Row, Row]
-    with Compiler[GeneratedAggregations] {
-
-  private var output: Row = _
-  private var accumulatorState: ValueState[Row] = _
-  private var rowMapState: MapState[Long, JList[Row]] = _
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
-  private var function: GeneratedAggregations = _
-
-  override def open(config: Configuration) {
-    LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
-                s"Code:\n$genAggregations.code")
-    val clazz = compile(
-      getRuntimeContext.getUserCodeClassLoader,
-      genAggregations.name,
-      genAggregations.code)
-    LOG.debug("Instantiating AggregateHelper.")
-    function = clazz.newInstance()
-    output = function.createOutputRow()
-
-    // We keep the elements received in a MapState indexed based on their ingestion time
-    val rowListTypeInfo: TypeInformation[JList[Row]] =
-      new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
-    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-      new MapStateDescriptor[Long, JList[Row]]("rowmapstate",
-        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
-    rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
-
-    val stateDescriptor: ValueStateDescriptor[Row] =
-      new ValueStateDescriptor[Row]("overState", aggregatesTypeInfo)
-    accumulatorState = getRuntimeContext.getState(stateDescriptor)
-  }
-
-  override def processElement(
-    input: Row,
-    ctx: ProcessFunction[Row, Row]#Context,
-    out: Collector[Row]): Unit = {
-
-    val currentTime = ctx.timerService.currentProcessingTime
-    // buffer the event incoming event
-
-    // add current element to the window list of elements with corresponding timestamp
-    var rowList = rowMapState.get(currentTime)
-    // null value means that this si the first event received for this timestamp
-    if (rowList == null) {
-      rowList = new ArrayList[Row]()
-      // register timer to process event once the current millisecond passed
-      ctx.timerService.registerProcessingTimeTimer(currentTime + 1)
-    }
-    rowList.add(input)
-    rowMapState.put(currentTime, rowList)
-
-  }
-
-  override def onTimer(
-    timestamp: Long,
-    ctx: ProcessFunction[Row, Row]#OnTimerContext,
-    out: Collector[Row]): Unit = {
-
-    // we consider the original timestamp of events that have registered this time trigger 1 ms ago
-    val currentTime = timestamp - 1
-    var i = 0
-
-    // initialize the accumulators
-    var accumulators = accumulatorState.value()
-
-    if (null == accumulators) {
-      accumulators = function.createAccumulators()
-    }
-
-    // update the elements to be removed and retract them from aggregators
-    val limit = currentTime - precedingTimeBoundary
-
-    // we iterate through all elements in the window buffer based on timestamp keys
-    // when we find timestamps that are out of interest, we retrieve corresponding elements
-    // and eliminate them. Multiple elements could have been received at the same timestamp
-    // the removal of old elements happens only once per proctime as onTimer is called only once
-    val iter = rowMapState.keys.iterator
-    val markToRemove = new ArrayList[Long]()
-    while (iter.hasNext) {
-      val elementKey = iter.next
-      if (elementKey < limit) {
-        // element key outside of window. Retract values
-        val elementsRemove = rowMapState.get(elementKey)
-        var iRemove = 0
-        while (iRemove < elementsRemove.size()) {
-          val retractRow = elementsRemove.get(iRemove)
-          function.retract(accumulators, retractRow)
-          iRemove += 1
-        }
-        // mark element for later removal not to modify the iterator over MapState
-        markToRemove.add(elementKey)
-      }
-    }
-    // need to remove in 2 steps not to have concurrent access errors via iterator to the MapState
-    i = 0
-    while (i < markToRemove.size()) {
-      rowMapState.remove(markToRemove.get(i))
-      i += 1
-    }
-
-    // get the list of elements of current proctime
-    val currentElements = rowMapState.get(currentTime)
-    // add current elements to aggregator. Multiple elements might have arrived in the same proctime
-    // the same accumulator value will be computed for all elements
-    var iElemenets = 0
-    while (iElemenets < currentElements.size()) {
-      val input = currentElements.get(iElemenets)
-      function.accumulate(accumulators, input)
-      iElemenets += 1
-    }
-
-    // we need to build the output and emit for every event received at this proctime
-    iElemenets = 0
-    while (iElemenets < currentElements.size()) {
-      val input = currentElements.get(iElemenets)
-
-      // set the fields of the last event to carry on with the aggregates
-      function.setForwardedFields(input, output)
-
-      // add the accumulators values to result
-      function.setAggregationResults(accumulators, output)
-      out.collect(output)
-      iElemenets += 1
-    }
-
-    // update the value of accumulators for future incremental computation
-    accumulatorState.update(accumulators)
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/07f1b035/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala
deleted file mode 100644
index d5ee4ae..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.aggregate
-
-import java.util
-
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.types.Row
-import org.apache.flink.util.{Collector, Preconditions}
-import org.apache.flink.api.common.state.ValueStateDescriptor
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.common.state.ValueState
-import org.apache.flink.api.common.state.MapState
-import org.apache.flink.api.common.state.MapStateDescriptor
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.ListTypeInfo
-import java.util.{List => JList}
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
-import org.slf4j.LoggerFactory
-
-/**
-  * Process Function for ROW clause processing-time bounded OVER window
-  *
-  * @param genAggregations      Generated aggregate helper function
-  * @param precedingOffset      preceding offset
-  * @param aggregatesTypeInfo   row type info of aggregation
-  * @param inputType            row type info of input row
-  */
-class BoundedProcessingOverRowProcessFunction(
-    genAggregations: GeneratedAggregationsFunction,
-    precedingOffset: Long,
-    aggregatesTypeInfo: RowTypeInfo,
-    inputType: TypeInformation[Row])
-  extends ProcessFunction[Row, Row]
-    with Compiler[GeneratedAggregations] {
-
-  Preconditions.checkArgument(precedingOffset > 0)
-
-  private var accumulatorState: ValueState[Row] = _
-  private var rowMapState: MapState[Long, JList[Row]] = _
-  private var output: Row = _
-  private var counterState: ValueState[Long] = _
-  private var smallestTsState: ValueState[Long] = _
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
-  private var function: GeneratedAggregations = _
-
-  override def open(config: Configuration) {
-    LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
-                s"Code:\n$genAggregations.code")
-    val clazz = compile(
-      getRuntimeContext.getUserCodeClassLoader,
-      genAggregations.name,
-      genAggregations.code)
-    LOG.debug("Instantiating AggregateHelper.")
-    function = clazz.newInstance()
-
-    output = function.createOutputRow()
-    // We keep the elements received in a Map state keyed
-    // by the ingestion time in the operator.
-    // we also keep counter of processed elements
-    // and timestamp of oldest element
-    val rowListTypeInfo: TypeInformation[JList[Row]] =
-      new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
-
-    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-      new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
-        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
-    rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
-
-    val aggregationStateDescriptor: ValueStateDescriptor[Row] =
-      new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
-    accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor)
-
-    val processedCountDescriptor : ValueStateDescriptor[Long] =
-       new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
-    counterState = getRuntimeContext.getState(processedCountDescriptor)
-
-    val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
-       new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
-    smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor)
-  }
-
-  override def processElement(
-    input: Row,
-    ctx: ProcessFunction[Row, Row]#Context,
-    out: Collector[Row]): Unit = {
-
-    val currentTime = ctx.timerService.currentProcessingTime
-
-    // initialize state for the processed element
-    var accumulators = accumulatorState.value
-    if (accumulators == null) {
-      accumulators = function.createAccumulators()
-    }
-
-    // get smallest timestamp
-    var smallestTs = smallestTsState.value
-    if (smallestTs == 0L) {
-      smallestTs = currentTime
-      smallestTsState.update(smallestTs)
-    }
-    // get previous counter value
-    var counter = counterState.value
-
-    if (counter == precedingOffset) {
-      val retractList = rowMapState.get(smallestTs)
-
-      // get oldest element beyond buffer size
-      // and if oldest element exist, retract value
-      val retractRow = retractList.get(0)
-      function.retract(accumulators, retractRow)
-      retractList.remove(0)
-
-      // if reference timestamp list not empty, keep the list
-      if (!retractList.isEmpty) {
-        rowMapState.put(smallestTs, retractList)
-      } // if smallest timestamp list is empty, remove and find new smallest
-      else {
-        rowMapState.remove(smallestTs)
-        val iter = rowMapState.keys.iterator
-        var currentTs: Long = 0L
-        var newSmallestTs: Long = Long.MaxValue
-        while (iter.hasNext) {
-          currentTs = iter.next
-          if (currentTs < newSmallestTs) {
-            newSmallestTs = currentTs
-          }
-        }
-        smallestTsState.update(newSmallestTs)
-      }
-    } // we update the counter only while buffer is getting filled
-    else {
-      counter += 1
-      counterState.update(counter)
-    }
-
-    // copy forwarded fields in output row
-    function.setForwardedFields(input, output)
-
-    // accumulate current row and set aggregate in output row
-    function.accumulate(accumulators, input)
-    function.setAggregationResults(accumulators, output)
-
-    // update map state, accumulator state, counter and timestamp
-    val currentTimeState = rowMapState.get(currentTime)
-    if (currentTimeState != null) {
-      currentTimeState.add(input)
-      rowMapState.put(currentTime, currentTimeState)
-    } else { // add new input
-      val newList = new util.ArrayList[Row]
-      newList.add(input)
-      rowMapState.put(currentTime, newList)
-    }
-
-    accumulatorState.update(accumulators)
-
-    out.collect(output)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/07f1b035/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
new file mode 100644
index 0000000..7f87e50
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{ArrayList, List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
+import org.slf4j.LoggerFactory
+
+/**
+  * Process Function used for the aggregate in bounded proc-time OVER window
+  * [[org.apache.flink.streaming.api.datastream.DataStream]]
+  *
+  * @param genAggregations Generated aggregate helper function
+  * @param precedingTimeBoundary    Is used to indicate the processing time boundaries
+  * @param aggregatesTypeInfo       row type info of aggregation
+  * @param inputType                row type info of input row
+  */
+class ProcTimeBoundedRangeOver(
+    genAggregations: GeneratedAggregationsFunction,
+    precedingTimeBoundary: Long,
+    aggregatesTypeInfo: RowTypeInfo,
+    inputType: TypeInformation[Row])
+  extends ProcessFunction[Row, Row]
+    with Compiler[GeneratedAggregations] {
+  private var output: Row = _
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+  private var function: GeneratedAggregations = _
+
+  override def open(config: Configuration) {
+    LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
+                s"Code:\n$genAggregations.code")
+    val clazz = compile(
+      getRuntimeContext.getUserCodeClassLoader,
+      genAggregations.name,
+      genAggregations.code)
+    LOG.debug("Instantiating AggregateHelper.")
+    function = clazz.newInstance()
+    output = function.createOutputRow()
+
+    // We keep the elements received in a MapState indexed based on their ingestion time
+    val rowListTypeInfo: TypeInformation[JList[Row]] =
+      new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+      new MapStateDescriptor[Long, JList[Row]]("rowmapstate",
+        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
+    rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+    val stateDescriptor: ValueStateDescriptor[Row] =
+      new ValueStateDescriptor[Row]("overState", aggregatesTypeInfo)
+    accumulatorState = getRuntimeContext.getState(stateDescriptor)
+  }
+
+  override def processElement(
+    input: Row,
+    ctx: ProcessFunction[Row, Row]#Context,
+    out: Collector[Row]): Unit = {
+
+    val currentTime = ctx.timerService.currentProcessingTime
+    // buffer the event incoming event
+
+    // add current element to the window list of elements with corresponding timestamp
+    var rowList = rowMapState.get(currentTime)
+    // null value means that this si the first event received for this timestamp
+    if (rowList == null) {
+      rowList = new ArrayList[Row]()
+      // register timer to process event once the current millisecond passed
+      ctx.timerService.registerProcessingTimeTimer(currentTime + 1)
+    }
+    rowList.add(input)
+    rowMapState.put(currentTime, rowList)
+
+  }
+
+  override def onTimer(
+    timestamp: Long,
+    ctx: ProcessFunction[Row, Row]#OnTimerContext,
+    out: Collector[Row]): Unit = {
+
+    // we consider the original timestamp of events that have registered this time trigger 1 ms ago
+    val currentTime = timestamp - 1
+    var i = 0
+
+    // initialize the accumulators
+    var accumulators = accumulatorState.value()
+
+    if (null == accumulators) {
+      accumulators = function.createAccumulators()
+    }
+
+    // update the elements to be removed and retract them from aggregators
+    val limit = currentTime - precedingTimeBoundary
+
+    // we iterate through all elements in the window buffer based on timestamp keys
+    // when we find timestamps that are out of interest, we retrieve corresponding elements
+    // and eliminate them. Multiple elements could have been received at the same timestamp
+    // the removal of old elements happens only once per proctime as onTimer is called only once
+    val iter = rowMapState.keys.iterator
+    val markToRemove = new ArrayList[Long]()
+    while (iter.hasNext) {
+      val elementKey = iter.next
+      if (elementKey < limit) {
+        // element key outside of window. Retract values
+        val elementsRemove = rowMapState.get(elementKey)
+        var iRemove = 0
+        while (iRemove < elementsRemove.size()) {
+          val retractRow = elementsRemove.get(iRemove)
+          function.retract(accumulators, retractRow)
+          iRemove += 1
+        }
+        // mark element for later removal not to modify the iterator over MapState
+        markToRemove.add(elementKey)
+      }
+    }
+    // need to remove in 2 steps not to have concurrent access errors via iterator to the MapState
+    i = 0
+    while (i < markToRemove.size()) {
+      rowMapState.remove(markToRemove.get(i))
+      i += 1
+    }
+
+    // get the list of elements of current proctime
+    val currentElements = rowMapState.get(currentTime)
+    // add current elements to aggregator. Multiple elements might have arrived in the same proctime
+    // the same accumulator value will be computed for all elements
+    var iElemenets = 0
+    while (iElemenets < currentElements.size()) {
+      val input = currentElements.get(iElemenets)
+      function.accumulate(accumulators, input)
+      iElemenets += 1
+    }
+
+    // we need to build the output and emit for every event received at this proctime
+    iElemenets = 0
+    while (iElemenets < currentElements.size()) {
+      val input = currentElements.get(iElemenets)
+
+      // set the fields of the last event to carry on with the aggregates
+      function.setForwardedFields(input, output)
+
+      // add the accumulators values to result
+      function.setAggregationResults(accumulators, output)
+      out.collect(output)
+      iElemenets += 1
+    }
+
+    // update the value of accumulators for future incremental computation
+    accumulatorState.update(accumulators)
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/07f1b035/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
new file mode 100644
index 0000000..31cfd73
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
+import org.slf4j.LoggerFactory
+
+/**
+  * Process Function for ROW clause processing-time bounded OVER window
+  *
+  * @param genAggregations      Generated aggregate helper function
+  * @param precedingOffset      preceding offset
+  * @param aggregatesTypeInfo   row type info of aggregation
+  * @param inputType            row type info of input row
+  */
+class ProcTimeBoundedRowsOver(
+    genAggregations: GeneratedAggregationsFunction,
+    precedingOffset: Long,
+    aggregatesTypeInfo: RowTypeInfo,
+    inputType: TypeInformation[Row])
+  extends ProcessFunction[Row, Row]
+    with Compiler[GeneratedAggregations] {
+
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+  private var function: GeneratedAggregations = _
+
+  override def open(config: Configuration) {
+    LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
+                s"Code:\n$genAggregations.code")
+    val clazz = compile(
+      getRuntimeContext.getUserCodeClassLoader,
+      genAggregations.name,
+      genAggregations.code)
+    LOG.debug("Instantiating AggregateHelper.")
+    function = clazz.newInstance()
+
+    output = function.createOutputRow()
+    // We keep the elements received in a Map state keyed
+    // by the ingestion time in the operator.
+    // we also keep counter of processed elements
+    // and timestamp of oldest element
+    val rowListTypeInfo: TypeInformation[JList[Row]] =
+      new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+      new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
+    rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+    val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+      new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+    accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor)
+
+    val processedCountDescriptor : ValueStateDescriptor[Long] =
+       new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+    counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+    val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+       new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+    smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor)
+  }
+
+  override def processElement(
+    input: Row,
+    ctx: ProcessFunction[Row, Row]#Context,
+    out: Collector[Row]): Unit = {
+
+    val currentTime = ctx.timerService.currentProcessingTime
+
+    // initialize state for the processed element
+    var accumulators = accumulatorState.value
+    if (accumulators == null) {
+      accumulators = function.createAccumulators()
+    }
+
+    // get smallest timestamp
+    var smallestTs = smallestTsState.value
+    if (smallestTs == 0L) {
+      smallestTs = currentTime
+      smallestTsState.update(smallestTs)
+    }
+    // get previous counter value
+    var counter = counterState.value
+
+    if (counter == precedingOffset) {
+      val retractList = rowMapState.get(smallestTs)
+
+      // get oldest element beyond buffer size
+      // and if oldest element exist, retract value
+      val retractRow = retractList.get(0)
+      function.retract(accumulators, retractRow)
+      retractList.remove(0)
+
+      // if reference timestamp list not empty, keep the list
+      if (!retractList.isEmpty) {
+        rowMapState.put(smallestTs, retractList)
+      } // if smallest timestamp list is empty, remove and find new smallest
+      else {
+        rowMapState.remove(smallestTs)
+        val iter = rowMapState.keys.iterator
+        var currentTs: Long = 0L
+        var newSmallestTs: Long = Long.MaxValue
+        while (iter.hasNext) {
+          currentTs = iter.next
+          if (currentTs < newSmallestTs) {
+            newSmallestTs = currentTs
+          }
+        }
+        smallestTsState.update(newSmallestTs)
+      }
+    } // we update the counter only while buffer is getting filled
+    else {
+      counter += 1
+      counterState.update(counter)
+    }
+
+    // copy forwarded fields in output row
+    function.setForwardedFields(input, output)
+
+    // accumulate current row and set aggregate in output row
+    function.accumulate(accumulators, input)
+    function.setAggregationResults(accumulators, output)
+
+    // update map state, accumulator state, counter and timestamp
+    val currentTimeState = rowMapState.get(currentTime)
+    if (currentTimeState != null) {
+      currentTimeState.add(input)
+      rowMapState.put(currentTime, currentTimeState)
+    } else { // add new input
+      val newList = new util.ArrayList[Row]
+      newList.add(input)
+      rowMapState.put(currentTime, newList)
+    }
+
+    accumulatorState.update(accumulators)
+
+    out.collect(output)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/07f1b035/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
new file mode 100644
index 0000000..6b9800b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
+import org.slf4j.LoggerFactory
+
+/**
+  * Process Function for non-partitioned processing-time unbounded OVER window
+  *
+  * @param genAggregations Generated aggregate helper function
+  * @param aggregationStateType     row type info of aggregation
+  */
+class ProcTimeUnboundedNonPartitionedOver(
+    genAggregations: GeneratedAggregationsFunction,
+    aggregationStateType: RowTypeInfo)
+  extends ProcessFunction[Row, Row]
+    with CheckpointedFunction
+    with Compiler[GeneratedAggregations] {
+
+  private var accumulators: Row = _
+  private var output: Row = _
+  private var state: ListState[Row] = _
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  private var function: GeneratedAggregations = _
+
+  override def open(config: Configuration) {
+    LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
+                s"Code:\n$genAggregations.code")
+    val clazz = compile(
+      getRuntimeContext.getUserCodeClassLoader,
+      genAggregations.name,
+      genAggregations.code)
+    LOG.debug("Instantiating AggregateHelper.")
+    function = clazz.newInstance()
+
+    output = function.createOutputRow()
+    if (null == accumulators) {
+      val it = state.get().iterator()
+      if (it.hasNext) {
+        accumulators = it.next()
+      } else {
+        accumulators = function.createAccumulators()
+      }
+    }
+  }
+
+  override def processElement(
+    input: Row,
+    ctx: ProcessFunction[Row, Row]#Context,
+    out: Collector[Row]): Unit = {
+
+    function.setForwardedFields(input, output)
+
+    function.accumulate(accumulators, input)
+    function.setAggregationResults(accumulators, output)
+
+    out.collect(output)
+  }
+
+  override def snapshotState(context: FunctionSnapshotContext): Unit = {
+    state.clear()
+    if (null != accumulators) {
+      state.add(accumulators)
+    }
+  }
+
+  override def initializeState(context: FunctionInitializationContext): Unit = {
+    val accumulatorsDescriptor = new ListStateDescriptor[Row]("overState", aggregationStateType)
+    state = context.getOperatorStateStore.getOperatorState(accumulatorsDescriptor)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/07f1b035/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
new file mode 100644
index 0000000..9baa6a3
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
+import org.slf4j.LoggerFactory
+
+/**
+  * Process Function for processing-time unbounded OVER window
+  *
+  * @param genAggregations Generated aggregate helper function
+  * @param aggregationStateType     row type info of aggregation
+  */
+class ProcTimeUnboundedPartitionedOver(
+    genAggregations: GeneratedAggregationsFunction,
+    aggregationStateType: RowTypeInfo)
+  extends ProcessFunction[Row, Row]
+    with Compiler[GeneratedAggregations] {
+
+  private var output: Row = _
+  private var state: ValueState[Row] = _
+  val LOG = LoggerFactory.getLogger(this.getClass)
+  private var function: GeneratedAggregations = _
+
+  override def open(config: Configuration) {
+    LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
+                s"Code:\n$genAggregations.code")
+    val clazz = compile(
+      getRuntimeContext.getUserCodeClassLoader,
+      genAggregations.name,
+      genAggregations.code)
+    LOG.debug("Instantiating AggregateHelper.")
+    function = clazz.newInstance()
+
+    output = function.createOutputRow()
+    val stateDescriptor: ValueStateDescriptor[Row] =
+      new ValueStateDescriptor[Row]("overState", aggregationStateType)
+    state = getRuntimeContext.getState(stateDescriptor)
+  }
+
+  override def processElement(
+    input: Row,
+    ctx: ProcessFunction[Row, Row]#Context,
+    out: Collector[Row]): Unit = {
+
+    var accumulators = state.value()
+
+    if (null == accumulators) {
+      accumulators = function.createAccumulators()
+    }
+
+    function.setForwardedFields(input, output)
+
+    function.accumulate(accumulators, input)
+    function.setAggregationResults(accumulators, output)
+
+    state.update(accumulators)
+
+    out.collect(output)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/07f1b035/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala
deleted file mode 100644
index 0f1ef49..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.aggregate
-
-import java.util.{List => JList, ArrayList => JArrayList}
-
-import org.apache.flink.api.common.state._
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
-import org.apache.flink.types.Row
-import org.apache.flink.util.{Collector, Preconditions}
-import org.slf4j.LoggerFactory
-
-/**
- * Process Function for RANGE clause event-time bounded OVER window
- *
-  * @param genAggregations Generated aggregate helper function
-  * @param aggregationStateType     row type info of aggregation
-  * @param inputRowType             row type info of input row
-  * @param precedingOffset          preceding offset
- */
-class RangeClauseBoundedOverProcessFunction(
-    genAggregations: GeneratedAggregationsFunction,
-    aggregationStateType: RowTypeInfo,
-    inputRowType: RowTypeInfo,
-    precedingOffset: Long)
-  extends ProcessFunction[Row, Row]
-    with Compiler[GeneratedAggregations] {
-
-  Preconditions.checkNotNull(aggregationStateType)
-  Preconditions.checkNotNull(precedingOffset)
-
-  private var output: Row = _
-
-  // the state which keeps the last triggering timestamp
-  private var lastTriggeringTsState: ValueState[Long] = _
-
-  // the state which used to materialize the accumulator for incremental calculation
-  private var accumulatorState: ValueState[Row] = _
-
-  // the state which keeps all the data that are not expired.
-  // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
-  // the second element of tuple is a list that contains the entire data of all the rows belonging
-  // to this time stamp.
-  private var dataState: MapState[Long, JList[Row]] = _
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
-  private var function: GeneratedAggregations = _
-
-  override def open(config: Configuration) {
-    LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
-                s"Code:\n$genAggregations.code")
-    val clazz = compile(
-      getRuntimeContext.getUserCodeClassLoader,
-      genAggregations.name,
-      genAggregations.code)
-    LOG.debug("Instantiating AggregateHelper.")
-    function = clazz.newInstance()
-
-    output = function.createOutputRow()
-
-    val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
-      new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long])
-    lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor)
-
-    val accumulatorStateDescriptor =
-      new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType)
-    accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor)
-
-    val keyTypeInformation: TypeInformation[Long] =
-      BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
-    val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputRowType)
-
-    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-      new MapStateDescriptor[Long, JList[Row]](
-        "dataState",
-        keyTypeInformation,
-        valueTypeInformation)
-
-    dataState = getRuntimeContext.getMapState(mapStateDescriptor)
-  }
-
-  override def processElement(
-    input: Row,
-    ctx: ProcessFunction[Row, Row]#Context,
-    out: Collector[Row]): Unit = {
-
-    // triggering timestamp for trigger calculation
-    val triggeringTs = ctx.timestamp
-
-    val lastTriggeringTs = lastTriggeringTsState.value
-
-    // check if the data is expired, if not, save the data and register event time timer
-    if (triggeringTs > lastTriggeringTs) {
-      val data = dataState.get(triggeringTs)
-      if (null != data) {
-        data.add(input)
-        dataState.put(triggeringTs, data)
-      } else {
-        val data = new JArrayList[Row]
-        data.add(input)
-        dataState.put(triggeringTs, data)
-        // register event time timer
-        ctx.timerService.registerEventTimeTimer(triggeringTs)
-      }
-    }
-  }
-
-  override def onTimer(
-    timestamp: Long,
-    ctx: ProcessFunction[Row, Row]#OnTimerContext,
-    out: Collector[Row]): Unit = {
-    // gets all window data from state for the calculation
-    val inputs: JList[Row] = dataState.get(timestamp)
-
-    if (null != inputs) {
-
-      var accumulators = accumulatorState.value
-      var dataListIndex = 0
-      var aggregatesIndex = 0
-
-      // initialize when first run or failover recovery per key
-      if (null == accumulators) {
-        accumulators = function.createAccumulators()
-        aggregatesIndex = 0
-      }
-
-      // keep up timestamps of retract data
-      val retractTsList: JList[Long] = new JArrayList[Long]
-
-      // do retraction
-      val dataTimestampIt = dataState.keys.iterator
-      while (dataTimestampIt.hasNext) {
-        val dataTs: Long = dataTimestampIt.next()
-        val offset = timestamp - dataTs
-        if (offset > precedingOffset) {
-          val retractDataList = dataState.get(dataTs)
-          dataListIndex = 0
-          while (dataListIndex < retractDataList.size()) {
-            val retractRow = retractDataList.get(dataListIndex)
-            function.retract(accumulators, retractRow)
-            dataListIndex += 1
-          }
-          retractTsList.add(dataTs)
-        }
-      }
-
-      // do accumulation
-      dataListIndex = 0
-      while (dataListIndex < inputs.size()) {
-        val curRow = inputs.get(dataListIndex)
-        // accumulate current row
-        function.accumulate(accumulators, curRow)
-        dataListIndex += 1
-      }
-
-      // set aggregate in output row
-      function.setAggregationResults(accumulators, output)
-
-      // copy forwarded fields to output row and emit output row
-      dataListIndex = 0
-      while (dataListIndex < inputs.size()) {
-        aggregatesIndex = 0
-        function.setForwardedFields(inputs.get(dataListIndex), output)
-        out.collect(output)
-        dataListIndex += 1
-      }
-
-      // remove the data that has been retracted
-      dataListIndex = 0
-      while (dataListIndex < retractTsList.size) {
-        dataState.remove(retractTsList.get(dataListIndex))
-        dataListIndex += 1
-      }
-
-      // update state
-      accumulatorState.update(accumulators)
-      lastTriggeringTsState.update(timestamp)
-    }
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/07f1b035/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
new file mode 100644
index 0000000..03ca02c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util.{List => JList, ArrayList => JArrayList}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.slf4j.LoggerFactory
+
+/**
+ * Process Function for RANGE clause event-time bounded OVER window
+ *
+  * @param genAggregations Generated aggregate helper function
+  * @param aggregationStateType     row type info of aggregation
+  * @param inputRowType             row type info of input row
+  * @param precedingOffset          preceding offset
+ */
+class RowTimeBoundedRangeOver(
+    genAggregations: GeneratedAggregationsFunction,
+    aggregationStateType: RowTypeInfo,
+    inputRowType: RowTypeInfo,
+    precedingOffset: Long)
+  extends ProcessFunction[Row, Row]
+    with Compiler[GeneratedAggregations] {
+  Preconditions.checkNotNull(aggregationStateType)
+  Preconditions.checkNotNull(precedingOffset)
+
+  private var output: Row = _
+
+  // the state which keeps the last triggering timestamp
+  private var lastTriggeringTsState: ValueState[Long] = _
+
+  // the state which used to materialize the accumulator for incremental calculation
+  private var accumulatorState: ValueState[Row] = _
+
+  // the state which keeps all the data that are not expired.
+  // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
+  // the second element of tuple is a list that contains the entire data of all the rows belonging
+  // to this time stamp.
+  private var dataState: MapState[Long, JList[Row]] = _
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+  private var function: GeneratedAggregations = _
+
+  override def open(config: Configuration) {
+    LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
+                s"Code:\n$genAggregations.code")
+    val clazz = compile(
+      getRuntimeContext.getUserCodeClassLoader,
+      genAggregations.name,
+      genAggregations.code)
+    LOG.debug("Instantiating AggregateHelper.")
+    function = clazz.newInstance()
+
+    output = function.createOutputRow()
+
+    val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
+      new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long])
+    lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor)
+
+    val accumulatorStateDescriptor =
+      new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType)
+    accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor)
+
+    val keyTypeInformation: TypeInformation[Long] =
+      BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
+    val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputRowType)
+
+    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+      new MapStateDescriptor[Long, JList[Row]](
+        "dataState",
+        keyTypeInformation,
+        valueTypeInformation)
+
+    dataState = getRuntimeContext.getMapState(mapStateDescriptor)
+  }
+
+  override def processElement(
+    input: Row,
+    ctx: ProcessFunction[Row, Row]#Context,
+    out: Collector[Row]): Unit = {
+
+    // triggering timestamp for trigger calculation
+    val triggeringTs = ctx.timestamp
+
+    val lastTriggeringTs = lastTriggeringTsState.value
+
+    // check if the data is expired, if not, save the data and register event time timer
+    if (triggeringTs > lastTriggeringTs) {
+      val data = dataState.get(triggeringTs)
+      if (null != data) {
+        data.add(input)
+        dataState.put(triggeringTs, data)
+      } else {
+        val data = new JArrayList[Row]
+        data.add(input)
+        dataState.put(triggeringTs, data)
+        // register event time timer
+        ctx.timerService.registerEventTimeTimer(triggeringTs)
+      }
+    }
+  }
+
+  override def onTimer(
+    timestamp: Long,
+    ctx: ProcessFunction[Row, Row]#OnTimerContext,
+    out: Collector[Row]): Unit = {
+    // gets all window data from state for the calculation
+    val inputs: JList[Row] = dataState.get(timestamp)
+
+    if (null != inputs) {
+
+      var accumulators = accumulatorState.value
+      var dataListIndex = 0
+      var aggregatesIndex = 0
+
+      // initialize when first run or failover recovery per key
+      if (null == accumulators) {
+        accumulators = function.createAccumulators()
+        aggregatesIndex = 0
+      }
+
+      // keep up timestamps of retract data
+      val retractTsList: JList[Long] = new JArrayList[Long]
+
+      // do retraction
+      val dataTimestampIt = dataState.keys.iterator
+      while (dataTimestampIt.hasNext) {
+        val dataTs: Long = dataTimestampIt.next()
+        val offset = timestamp - dataTs
+        if (offset > precedingOffset) {
+          val retractDataList = dataState.get(dataTs)
+          dataListIndex = 0
+          while (dataListIndex < retractDataList.size()) {
+            val retractRow = retractDataList.get(dataListIndex)
+            function.retract(accumulators, retractRow)
+            dataListIndex += 1
+          }
+          retractTsList.add(dataTs)
+        }
+      }
+
+      // do accumulation
+      dataListIndex = 0
+      while (dataListIndex < inputs.size()) {
+        val curRow = inputs.get(dataListIndex)
+        // accumulate current row
+        function.accumulate(accumulators, curRow)
+        dataListIndex += 1
+      }
+
+      // set aggregate in output row
+      function.setAggregationResults(accumulators, output)
+
+      // copy forwarded fields to output row and emit output row
+      dataListIndex = 0
+      while (dataListIndex < inputs.size()) {
+        aggregatesIndex = 0
+        function.setForwardedFields(inputs.get(dataListIndex), output)
+        out.collect(output)
+        dataListIndex += 1
+      }
+
+      // remove the data that has been retracted
+      dataListIndex = 0
+      while (dataListIndex < retractTsList.size) {
+        dataState.remove(retractTsList.get(dataListIndex))
+        dataListIndex += 1
+      }
+
+      // update state
+      accumulatorState.update(accumulators)
+      lastTriggeringTsState.update(timestamp)
+    }
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/07f1b035/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
new file mode 100644
index 0000000..4a9a14c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
+import org.slf4j.LoggerFactory
+
+/**
+ * Process Function for ROWS clause event-time bounded OVER window
+ *
+  * @param genAggregations Generated aggregate helper function
+  * @param aggregationStateType     row type info of aggregation
+  * @param inputRowType             row type info of input row
+  * @param precedingOffset          preceding offset
+ */
+class RowTimeBoundedRowsOver(
+    genAggregations: GeneratedAggregationsFunction,
+    aggregationStateType: RowTypeInfo,
+    inputRowType: RowTypeInfo,
+    precedingOffset: Long)
+  extends ProcessFunction[Row, Row]
+    with Compiler[GeneratedAggregations] {
+
+  Preconditions.checkNotNull(aggregationStateType)
+  Preconditions.checkNotNull(precedingOffset)
+
+  private var output: Row = _
+
+  // the state which keeps the last triggering timestamp
+  private var lastTriggeringTsState: ValueState[Long] = _
+
+  // the state which keeps the count of data
+  private var dataCountState: ValueState[Long] = _
+
+  // the state which used to materialize the accumulator for incremental calculation
+  private var accumulatorState: ValueState[Row] = _
+
+  // the state which keeps all the data that are not expired.
+  // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
+  // the second element of tuple is a list that contains the entire data of all the rows belonging
+  // to this time stamp.
+  private var dataState: MapState[Long, JList[Row]] = _
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+  private var function: GeneratedAggregations = _
+
+  override def open(config: Configuration) {
+    LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
+                s"Code:\n$genAggregations.code")
+    val clazz = compile(
+      getRuntimeContext.getUserCodeClassLoader,
+      genAggregations.name,
+      genAggregations.code)
+    LOG.debug("Instantiating AggregateHelper.")
+    function = clazz.newInstance()
+
+    output = function.createOutputRow()
+
+    val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
+      new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long])
+    lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor)
+
+    val dataCountStateDescriptor =
+      new ValueStateDescriptor[Long]("dataCountState", classOf[Long])
+    dataCountState = getRuntimeContext.getState(dataCountStateDescriptor)
+
+    val accumulatorStateDescriptor =
+      new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType)
+    accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor)
+
+    val keyTypeInformation: TypeInformation[Long] =
+      BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
+    val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputRowType)
+
+    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+      new MapStateDescriptor[Long, JList[Row]](
+        "dataState",
+        keyTypeInformation,
+        valueTypeInformation)
+
+    dataState = getRuntimeContext.getMapState(mapStateDescriptor)
+  }
+
+  override def processElement(
+    input: Row,
+    ctx: ProcessFunction[Row, Row]#Context,
+    out: Collector[Row]): Unit = {
+
+    // triggering timestamp for trigger calculation
+    val triggeringTs = ctx.timestamp
+
+    val lastTriggeringTs = lastTriggeringTsState.value
+    // check if the data is expired, if not, save the data and register event time timer
+
+    if (triggeringTs > lastTriggeringTs) {
+      val data = dataState.get(triggeringTs)
+      if (null != data) {
+        data.add(input)
+        dataState.put(triggeringTs, data)
+      } else {
+        val data = new util.ArrayList[Row]
+        data.add(input)
+        dataState.put(triggeringTs, data)
+        // register event time timer
+        ctx.timerService.registerEventTimeTimer(triggeringTs)
+      }
+    }
+  }
+
+  override def onTimer(
+    timestamp: Long,
+    ctx: ProcessFunction[Row, Row]#OnTimerContext,
+    out: Collector[Row]): Unit = {
+
+    // gets all window data from state for the calculation
+    val inputs: JList[Row] = dataState.get(timestamp)
+
+    if (null != inputs) {
+
+      var accumulators = accumulatorState.value
+      var dataCount = dataCountState.value
+
+      var retractList: JList[Row] = null
+      var retractTs: Long = Long.MaxValue
+      var retractCnt: Int = 0
+      var i = 0
+
+      while (i < inputs.size) {
+        val input = inputs.get(i)
+
+        // initialize when first run or failover recovery per key
+        if (null == accumulators) {
+          accumulators = function.createAccumulators()
+        }
+
+        var retractRow: Row = null
+
+        if (dataCount >= precedingOffset) {
+          if (null == retractList) {
+            // find the smallest timestamp
+            retractTs = Long.MaxValue
+            val dataTimestampIt = dataState.keys.iterator
+            while (dataTimestampIt.hasNext) {
+              val dataTs = dataTimestampIt.next
+              if (dataTs < retractTs) {
+                retractTs = dataTs
+              }
+            }
+            // get the oldest rows to retract them
+            retractList = dataState.get(retractTs)
+          }
+
+          retractRow = retractList.get(retractCnt)
+          retractCnt += 1
+
+          // remove retracted values from state
+          if (retractList.size == retractCnt) {
+            dataState.remove(retractTs)
+            retractList = null
+            retractCnt = 0
+          }
+        } else {
+          dataCount += 1
+        }
+
+        // copy forwarded fields to output row
+        function.setForwardedFields(input, output)
+
+        // retract old row from accumulators
+        if (null != retractRow) {
+          function.retract(accumulators, retractRow)
+        }
+
+        // accumulate current row and set aggregate in output row
+        function.accumulate(accumulators, input)
+        function.setAggregationResults(accumulators, output)
+        i += 1
+
+        out.collect(output)
+      }
+
+      // update all states
+      if (dataState.contains(retractTs)) {
+        if (retractCnt > 0) {
+          retractList.subList(0, retractCnt).clear()
+          dataState.put(retractTs, retractList)
+        }
+      }
+      dataCountState.update(dataCount)
+      accumulatorState.update(accumulators)
+    }
+
+    lastTriggeringTsState.update(timestamp)
+  }
+}
+
+