You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/05/16 05:22:41 UTC

[flink] branch master updated: [FLINK-11945][table-runtime-blink] Support over aggregation for blink streaming runtime

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e6bbc09  [FLINK-11945][table-runtime-blink] Support over aggregation for blink streaming runtime
e6bbc09 is described below

commit e6bbc09e6999c08ae087559c365f94d2c1f38efa
Author: XuQianJin-Stars <x1...@163.com>
AuthorDate: Tue Apr 23 16:08:48 2019 +0800

    [FLINK-11945][table-runtime-blink] Support over aggregation for blink streaming runtime
    
    This closes #8244
---
 .../physical/stream/StreamExecOverAggregate.scala  |  343 ++++++-
 .../flink/table/plan/util/AggregateUtil.scala      |    8 +-
 .../flink/table/plan/util/OverAggregateUtil.scala  |    1 -
 .../sql/validation/OverWindowValidationTest.scala  |   60 ++
 .../stream/sql/agg/OverWindowAggregateTest.scala   |    9 +
 .../table/runtime/harness/HarnessTestBase.scala    |  111 +++
 .../runtime/harness/OverWindowHarnessTest.scala    |  953 ++++++++++++++++++
 .../runtime/stream/sql/OverWindowITCase.scala      | 1025 ++++++++++++++++++++
 .../KeyedProcessFunctionWithCleanupState.java      |   11 +
 .../AbstractRowTimeUnboundedPrecedingOver.java     |  266 +++++
 .../ProcTimeRangeBoundedPrecedingFunction.java     |  246 +++++
 .../over/ProcTimeRowsBoundedPrecedingFunction.java |  235 +++++
 .../over/ProcTimeUnboundedPrecedingFunction.java   |  126 +++
 .../over/RowTimeRangeBoundedPrecedingFunction.java |  289 ++++++
 .../RowTimeRangeUnboundedPrecedingFunction.java    |   79 ++
 .../over/RowTimeRowsBoundedPrecedingFunction.java  |  301 ++++++
 .../RowTimeRowsUnboundedPrecedingFunction.java     |   71 ++
 .../apache/flink/table/type/TypeConverters.java    |   17 +-
 .../table/runtime/util/BaseRowHarnessAssertor.java |   20 +-
 .../table/runtime/util/StreamRecordUtils.java      |   45 +
 20 files changed, 4189 insertions(+), 27 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecOverAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecOverAggregate.scala
index 93e6831..896d97d 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecOverAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecOverAggregate.scala
@@ -17,25 +17,36 @@
  */
 package org.apache.flink.table.plan.nodes.physical.stream
 
-import org.apache.flink.streaming.api.transformations.StreamTransformation
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator
+import org.apache.flink.streaming.api.transformations.{OneInputTransformation, StreamTransformation}
 import org.apache.flink.table.CalcitePair
-import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
+import org.apache.flink.table.`type`.TypeConverters
+import org.apache.flink.table.api.{StreamTableEnvironment, TableConfig, TableException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.CodeGeneratorContext
+import org.apache.flink.table.codegen.agg.AggsHandlerCodeGenerator
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
-import org.apache.flink.table.plan.util.RelExplainUtil
+import org.apache.flink.table.plan.rules.physical.stream.StreamExecRetractionRules
+import org.apache.flink.table.plan.util.AggregateUtil.transformToStreamAggregateInfoList
+import org.apache.flink.table.plan.util.{KeySelectorUtil, OverAggregateUtil, RelExplainUtil}
+import org.apache.flink.table.runtime.over._
+import org.apache.flink.table.typeutils.BaseRowTypeInfo
 
 import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
+import org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.Window.Group
 import org.apache.calcite.rel.core.{AggregateCall, Window}
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.calcite.rex.RexLiteral
+import org.apache.calcite.tools.RelBuilder
 
 import java.util
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 /**
   * Stream physical RelNode for time-based over [[Window]].
@@ -96,7 +107,7 @@ class StreamExecOverAggregate(
 
   override def explainTerms(pw: RelWriter): RelWriter = {
     val overWindow: Group = logicWindow.groups.get(0)
-    val constants: Seq[RexLiteral] = logicWindow.constants
+    val constants: Seq[RexLiteral] = logicWindow.constants.asScala
     val partitionKeys: Array[Int] = overWindow.keys.toArray
     val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates
 
@@ -123,7 +134,7 @@ class StreamExecOverAggregate(
   //~ ExecNode methods -----------------------------------------------------------
 
   override def getInputNodes: util.List[ExecNode[StreamTableEnvironment, _]] = {
-    getInputs.map(_.asInstanceOf[ExecNode[StreamTableEnvironment, _]])
+    getInputs.asScala.map(_.asInstanceOf[ExecNode[StreamTableEnvironment, _]]).asJava
   }
 
   override def replaceInputNode(
@@ -134,6 +145,324 @@ class StreamExecOverAggregate(
 
   override protected def translateToPlanInternal(
       tableEnv: StreamTableEnvironment): StreamTransformation[BaseRow] = {
-    throw new TableException("Implements this")
+    val tableConfig = tableEnv.getConfig
+
+    if (logicWindow.groups.size > 1) {
+      throw new TableException(
+          "All aggregates must be computed on the same window.")
+    }
+
+    val overWindow: Group = logicWindow.groups.get(0)
+
+    val orderKeys = overWindow.orderKeys.getFieldCollations
+
+    if (orderKeys.size() != 1) {
+      throw new TableException(
+          "The window can only be ordered by a single time column.")
+    }
+    val orderKey = orderKeys.get(0)
+
+    if (!orderKey.direction.equals(ASCENDING)) {
+      throw new TableException(
+          "The window can only be ordered in ASCENDING mode.")
+    }
+
+    val inputDS = getInputNodes.get(0).translateToPlan(tableEnv)
+      .asInstanceOf[StreamTransformation[BaseRow]]
+
+    val inputIsAccRetract = StreamExecRetractionRules.isAccRetract(input)
+
+    if (inputIsAccRetract) {
+      throw new TableException(
+          "Retraction on Over window aggregation is not supported yet. " +
+            "Note: Over window aggregation should not follow a non-windowed GroupBy aggregation.")
+    }
+
+    if (!logicWindow.groups.get(0).keys.isEmpty && tableConfig.getMinIdleStateRetentionTime < 0) {
+      LOG.warn(
+        "No state retention interval configured for a query which accumulates state. " +
+          "Please provide a query configuration with valid retention interval to prevent " +
+          "excessive state size. You may specify a retention time of 0 to not clean up the state.")
+    }
+
+    val timeType = outputRowType.getFieldList.get(orderKey.getFieldIndex).getType
+
+    // check time field
+    if (!FlinkTypeFactory.isRowtimeIndicatorType(timeType)
+      && !FlinkTypeFactory.isProctimeIndicatorType(timeType)) {
+      throw new TableException(
+        "OVER windows' ordering in stream mode must be defined on a time attribute.")
+    }
+
+    // identify window rowtime attribute
+    val rowTimeIdx: Option[Int] = if (FlinkTypeFactory.isRowtimeIndicatorType(timeType)) {
+      Some(orderKey.getFieldIndex)
+    } else if (FlinkTypeFactory.isProctimeIndicatorType(timeType)) {
+      None
+    } else {
+      throw new TableException(
+          "OVER windows can only be applied on time attributes.")
+    }
+
+    val codeGenCtx = CodeGeneratorContext(tableConfig)
+    val aggregateCalls = logicWindow.groups.get(0).getAggregateCalls(logicWindow).asScala
+    val isRowsClause = overWindow.isRows
+    val constants = logicWindow.constants.asScala
+    val constantTypes = constants.map(c => FlinkTypeFactory.toInternalType(c.getType))
+
+    val fieldNames = inputRowType.getFieldNames.asScala
+    val fieldTypes = inputRowType.getFieldList.asScala
+      .map(c => FlinkTypeFactory.toInternalType(c.getType))
+
+    val inRowType = FlinkTypeFactory.toInternalRowType(inputRel.getRowType)
+    val outRowType = FlinkTypeFactory.toInternalRowType(outputRowType)
+
+    val aggInputType = tableEnv.getTypeFactory.buildRelDataType(
+      fieldNames ++ constants.indices.map(i => "TMP" + i),
+      fieldTypes ++ constantTypes)
+
+    val overProcessFunction = if (overWindow.lowerBound.isPreceding
+      && overWindow.lowerBound.isUnbounded
+      && overWindow.upperBound.isCurrentRow) {
+
+      // unbounded OVER window
+      createUnboundedOverProcessFunction(
+        codeGenCtx,
+        aggregateCalls,
+        constants,
+        aggInputType,
+        rowTimeIdx,
+        isRowsClause,
+        tableConfig,
+        tableEnv.getRelBuilder,
+        tableConfig.getNullCheck)
+
+    } else if (overWindow.lowerBound.isPreceding
+      && !overWindow.lowerBound.isUnbounded
+      && overWindow.upperBound.isCurrentRow) {
+
+      val boundValue = OverAggregateUtil.getBoundary(logicWindow, overWindow.lowerBound)
+
+      if (boundValue.isInstanceOf[BigDecimal]) {
+        throw new TableException(
+            "the specific value is decimal which haven not supported yet.")
+      }
+      // bounded OVER window
+      val precedingOffset = -1 * boundValue.asInstanceOf[Long] + (if (isRowsClause) 1 else 0)
+
+      createBoundedOverProcessFunction(
+        codeGenCtx,
+        aggregateCalls,
+        constants,
+        aggInputType,
+        rowTimeIdx,
+        isRowsClause,
+        precedingOffset,
+        tableConfig,
+        tableEnv.getRelBuilder,
+        tableConfig.getNullCheck)
+
+    } else {
+      throw new TableException(
+          "OVER RANGE FOLLOWING windows are not supported yet.")
+    }
+
+    val partitionKeys: Array[Int] = overWindow.keys.toArray
+    val inputTypeInfo = inRowType.toTypeInfo
+
+    val selector = KeySelectorUtil.getBaseRowSelector(partitionKeys, inputTypeInfo)
+
+    val returnTypeInfo = outRowType.toTypeInfo
+      .asInstanceOf[BaseRowTypeInfo]
+    // partitioned aggregation
+
+    val operator = new KeyedProcessOperator(overProcessFunction)
+
+    val ret = new OneInputTransformation(
+      inputDS,
+      "OverAggregate",
+      operator,
+      returnTypeInfo,
+      inputDS.getParallelism)
+
+    if (partitionKeys.isEmpty) {
+      ret.setParallelism(1)
+      ret.setMaxParallelism(1)
+    }
+
+    // set KeyType and Selector for state
+    ret.setStateKeySelector(selector)
+    ret.setStateKeyType(selector.getProducedType)
+    ret
+  }
+
+  /**
+    * Create an ProcessFunction for unbounded OVER window to evaluate final aggregate value.
+    *
+    * @param ctx            code generator context
+    * @param aggregateCalls physical calls to aggregate functions and their output field names
+    * @param constants      the constants in aggregates parameters, such as sum(1)
+    * @param aggInputType   physical type of the input row which consist of input and constants.
+    * @param rowTimeIdx     the index of the rowtime field or None in case of processing time.
+    * @param isRowsClause   it is a tag that indicates whether the OVER clause is ROWS clause
+    */
+  private def createUnboundedOverProcessFunction(
+      ctx: CodeGeneratorContext,
+      aggregateCalls: Seq[AggregateCall],
+      constants: Seq[RexLiteral],
+      aggInputType: RelDataType,
+      rowTimeIdx: Option[Int],
+      isRowsClause: Boolean,
+      tableConfig: TableConfig,
+      relBuilder: RelBuilder,
+      nullCheck: Boolean): KeyedProcessFunction[BaseRow, BaseRow, BaseRow] = {
+
+    val needRetraction = false
+    val aggInfoList = transformToStreamAggregateInfoList(
+      aggregateCalls,
+      // use aggInputType which considers constants as input instead of inputSchema.relDataType
+      aggInputType,
+      Array.fill(aggregateCalls.size)(needRetraction),
+      needInputCount = needRetraction,
+      isStateBackendDataViews = true)
+
+    val fieldTypes = inputRowType.getFieldList.asScala.
+      map(c => FlinkTypeFactory.toInternalType(c.getType)).toArray
+
+    val generator = new AggsHandlerCodeGenerator(
+      ctx,
+      relBuilder,
+      fieldTypes,
+      copyInputField = false)
+
+    val genAggsHandler = generator
+      .needAccumulate()
+      // over agg code gen must pass the constants
+      .withConstants(constants)
+      .generateAggsHandler("UnboundedOverAggregateHelper", aggInfoList)
+
+    val flattenAccTypes = aggInfoList.getAccTypes.map(
+      TypeConverters.createInternalTypeFromTypeInfo)
+
+    if (rowTimeIdx.isDefined) {
+      if (isRowsClause) {
+        // ROWS unbounded over process function
+        new RowTimeRowsUnboundedPrecedingFunction(
+          tableConfig.getMinIdleStateRetentionTime,
+          tableConfig.getMaxIdleStateRetentionTime,
+          genAggsHandler,
+          flattenAccTypes,
+          fieldTypes,
+          rowTimeIdx.get)
+      } else {
+        // RANGE unbounded over process function
+        new RowTimeRangeUnboundedPrecedingFunction(
+          tableConfig.getMinIdleStateRetentionTime,
+          tableConfig.getMaxIdleStateRetentionTime,
+          genAggsHandler,
+          flattenAccTypes,
+          fieldTypes,
+          rowTimeIdx.get)
+      }
+    } else {
+      new ProcTimeUnboundedPrecedingFunction(
+        tableConfig.getMinIdleStateRetentionTime,
+        tableConfig.getMaxIdleStateRetentionTime,
+        genAggsHandler,
+        flattenAccTypes)
+    }
+  }
+
+  /**
+    * Create an ProcessFunction for ROWS clause bounded OVER window to evaluate final
+    * aggregate value.
+    *
+    * @param ctx            code generator context
+    * @param aggregateCalls physical calls to aggregate functions and their output field names
+    * @param constants      the constants in aggregates parameters, such as sum(1)
+    * @param aggInputType   physical type of the input row which consist of input and constants.
+    * @param rowTimeIdx     the index of the rowtime field or None in case of processing time.
+    * @param isRowsClause   it is a tag that indicates whether the OVER clause is ROWS clause
+    */
+  private def createBoundedOverProcessFunction(
+      ctx: CodeGeneratorContext,
+      aggregateCalls: Seq[AggregateCall],
+      constants: Seq[RexLiteral],
+      aggInputType: RelDataType,
+      rowTimeIdx: Option[Int],
+      isRowsClause: Boolean,
+      precedingOffset: Long,
+      tableConfig: TableConfig,
+      relBuilder: RelBuilder,
+      nullCheck: Boolean): KeyedProcessFunction[BaseRow, BaseRow, BaseRow] = {
+
+    val needRetraction = true
+    val aggInfoList = transformToStreamAggregateInfoList(
+      aggregateCalls,
+      // use aggInputType which considers constants as input instead of inputSchema.relDataType
+      aggInputType,
+      Array.fill(aggregateCalls.size)(needRetraction),
+      needInputCount = needRetraction,
+      isStateBackendDataViews = true)
+
+    val fieldTypes = inputRowType.getFieldList.asScala.
+      map(c => FlinkTypeFactory.toInternalType(c.getType)).toArray
+
+    val generator = new AggsHandlerCodeGenerator(
+      ctx,
+      relBuilder,
+      fieldTypes,
+      copyInputField = false)
+
+
+    val genAggsHandler = generator
+      .needRetract()
+      .needAccumulate()
+      // over agg code gen must pass the constants
+      .withConstants(constants)
+      .generateAggsHandler("BoundedOverAggregateHelper", aggInfoList)
+
+    val flattenAccTypes = aggInfoList.getAccTypes.map(
+      TypeConverters.createInternalTypeFromTypeInfo)
+
+    if (rowTimeIdx.isDefined) {
+      if (isRowsClause) {
+        new RowTimeRowsBoundedPrecedingFunction(
+          tableConfig.getMinIdleStateRetentionTime,
+          tableConfig.getMaxIdleStateRetentionTime,
+          genAggsHandler,
+          flattenAccTypes,
+          fieldTypes,
+          precedingOffset,
+          rowTimeIdx.get)
+      } else {
+        new RowTimeRangeBoundedPrecedingFunction(
+          tableConfig.getMinIdleStateRetentionTime,
+          tableConfig.getMaxIdleStateRetentionTime,
+          genAggsHandler,
+          flattenAccTypes,
+          fieldTypes,
+          precedingOffset,
+          rowTimeIdx.get)
+      }
+    } else {
+      if (isRowsClause) {
+        new ProcTimeRowsBoundedPrecedingFunction(
+          tableConfig.getMinIdleStateRetentionTime,
+          tableConfig.getMaxIdleStateRetentionTime,
+          genAggsHandler,
+          flattenAccTypes,
+          fieldTypes,
+          precedingOffset)
+      } else {
+        new ProcTimeRangeBoundedPrecedingFunction(
+          tableConfig.getMinIdleStateRetentionTime,
+          tableConfig.getMaxIdleStateRetentionTime,
+          genAggsHandler,
+          flattenAccTypes,
+          fieldTypes,
+          precedingOffset)
+      }
+    }
   }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
index 7cc9646..bd5dcec 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
@@ -19,7 +19,7 @@ package org.apache.flink.table.plan.util
 
 import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
 import org.apache.flink.table.`type`.InternalTypes._
-import org.apache.flink.table.`type`.{DecimalType, InternalType, InternalTypes, RowType, TypeConverters}
+import org.apache.flink.table.`type`.{DecimalType, InternalType, InternalTypes, TypeConverters}
 import org.apache.flink.table.api.{TableConfig, TableConfigOptions, TableException}
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.calcite.{FlinkTypeFactory, FlinkTypeSystem}
@@ -34,8 +34,7 @@ import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.functions.{AggregateFunction, UserDefinedFunction}
 import org.apache.flink.table.plan.`trait`.RelModifiedMonotonicity
 import org.apache.flink.table.runtime.bundle.trigger.CountBundleTrigger
-import org.apache.flink.table.typeutils.{BinaryStringTypeInfo, DecimalTypeInfo, MapViewTypeInfo, TimeIndicatorTypeInfo, TimeIntervalTypeInfo}
-
+import org.apache.flink.table.typeutils.{BaseRowTypeInfo, BinaryStringTypeInfo, DecimalTypeInfo, MapViewTypeInfo, TimeIndicatorTypeInfo, TimeIntervalTypeInfo}
 import org.apache.calcite.rel.`type`._
 import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
 import org.apache.calcite.rex.RexInputRef
@@ -43,7 +42,6 @@ import org.apache.calcite.sql.fun._
 import org.apache.calcite.sql.validate.SqlMonotonicity
 import org.apache.calcite.sql.{SqlKind, SqlRankFunction}
 import org.apache.calcite.tools.RelBuilder
-
 import java.util
 
 import scala.collection.JavaConversions._
@@ -490,7 +488,7 @@ object AggregateUtil extends Enumeration {
             s"Please re-check the data type.")
       }
     } else {
-      TypeConverters.createExternalTypeInfoFromInternalType(new RowType(argTypes: _*))
+      new BaseRowTypeInfo(argTypes: _*)
     }
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/OverAggregateUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/OverAggregateUtil.scala
index 4017b52..9320f19 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/OverAggregateUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/OverAggregateUtil.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.table.plan.util
 
 import org.apache.flink.table.JArrayList
-
 import org.apache.calcite.rel.RelFieldCollation.{Direction, NullDirection}
 import org.apache.calcite.rel.core.Window
 import org.apache.calcite.rel.core.Window.Group
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala
new file mode 100644
index 0000000..71b05ba
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableException, ValidationException}
+import org.apache.flink.table.runtime.utils.JavaUserDefinedScalarFunctions.OverAgg0
+import org.apache.flink.table.util.TableTestBase
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class OverWindowValidationTest extends TableTestBase {
+
+  private val streamUtil = streamTestUtil()
+  streamUtil.addDataStream[(Int, String, Long)]("T1", 'a, 'b, 'c, 'proctime)
+
+  /**
+    * All aggregates must be computed on the same window.
+    */
+  @Test(expected = classOf[TableException])
+  def testMultiWindow(): Unit = {
+
+    val sqlQuery = "SELECT " +
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding), " +
+      "sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) " +
+      "from T1"
+
+    streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+  }
+
+  /**
+    * OVER clause is necessary for [[OverAgg0]] window function.
+    */
+  @Test(expected = classOf[ValidationException])
+  def testInvalidOverAggregation(): Unit = {
+    streamUtil.addFunction("overAgg", new OverAgg0)
+
+    val sqlQuery = "SELECT overAgg(c, a) FROM MyTable"
+
+    streamUtil.tableEnv.sqlQuery(sqlQuery)
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/OverWindowAggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/OverWindowAggregateTest.scala
index e9b5a77..0905cbd 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/OverWindowAggregateTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/agg/OverWindowAggregateTest.scala
@@ -417,6 +417,15 @@ class OverWindowAggregateTest extends TableTestBase {
         |FROM MyTable
       """.stripMargin
 
+    val sql2 = "SELECT " +
+      "a, " +
+      "SUM(c) OVER w1, " +
+      "MIN(c) OVER w2 " +
+      "FROM MyTable " +
+      "WINDOW w1 AS (PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW)," +
+      "w2 AS (PARTITION BY a ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)"
+
+    verifyPlanIdentical(sql, sql2)
     util.verifyPlan(sql)
   }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
new file mode 100644
index 0000000..5b50b7b
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.harness
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.configuration.{CheckpointingOptions, Configuration}
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
+import org.apache.flink.runtime.state.StateBackend
+import org.apache.flink.runtime.state.memory.MemoryStateBackend
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.streaming.api.transformations.{OneInputTransformation, StreamTransformation}
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.runtime.utils.StreamingTestBase
+import org.apache.flink.table.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode}
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConversions._
+
+class HarnessTestBase(mode: StateBackendMode) extends StreamingTestBase {
+
+  private val classLoader = Thread.currentThread.getContextClassLoader
+
+  protected def getStateBackend: StateBackend = {
+    mode match {
+      case HEAP_BACKEND =>
+        val conf = new Configuration()
+        conf.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, true)
+        new MemoryStateBackend().configure(conf, classLoader)
+
+      case ROCKSDB_BACKEND =>
+        new RocksDBStateBackend("file://" + tempFolder.newFolder().getAbsoluteFile)
+    }
+  }
+
+  def createHarnessTester[IN, OUT, KEY](
+      operator: OneInputStreamOperator[IN, OUT],
+      keySelector: KeySelector[IN, KEY],
+      keyType: TypeInformation[KEY]): KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT] = {
+    val harness = new KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT](
+      operator,
+      keySelector,
+      keyType)
+    harness.setStateBackend(getStateBackend)
+    harness
+  }
+
+  def createHarnessTester(
+      ds: DataStream[_],
+      prefixOperatorName: String)
+  : KeyedOneInputStreamOperatorTestHarness[BaseRow, BaseRow, BaseRow] = {
+
+    val transformation = extractExpectedTransformation(
+      ds.javaStream.getTransformation,
+      prefixOperatorName)
+    val processOperator = transformation.getOperator
+      .asInstanceOf[OneInputStreamOperator[Any, Any]]
+    val keySelector = transformation.getStateKeySelector.asInstanceOf[KeySelector[Any, Any]]
+    val keyType = transformation.getStateKeyType.asInstanceOf[TypeInformation[Any]]
+
+    createHarnessTester(processOperator, keySelector, keyType)
+      .asInstanceOf[KeyedOneInputStreamOperatorTestHarness[BaseRow, BaseRow, BaseRow]]
+  }
+
+  private def extractExpectedTransformation(
+      t: StreamTransformation[_],
+      prefixOperatorName: String): OneInputTransformation[_, _] = {
+    t match {
+      case one: OneInputTransformation[_, _] =>
+        if (one.getName.startsWith(prefixOperatorName)) {
+          one
+        } else {
+          extractExpectedTransformation(one.getInput, prefixOperatorName)
+        }
+      case _ => throw new Exception(
+        s"Can not find the expected $prefixOperatorName transformation")
+    }
+  }
+
+  def dropWatermarks(elements: Array[AnyRef]): util.Collection[AnyRef] = {
+    elements.filter(e => !e.isInstanceOf[Watermark]).toList
+  }
+}
+
+object HarnessTestBase {
+
+  @Parameterized.Parameters(name = "StateBackend={0}")
+  def parameters(): util.Collection[Array[java.lang.Object]] = {
+    Seq[Array[AnyRef]](Array(HEAP_BACKEND), Array(ROCKSDB_BACKEND))
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
new file mode 100644
index 0000000..e515138
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
@@ -0,0 +1,953 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Long => JLong}
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.util.StreamRecordUtils.baserow
+import org.apache.flink.table.runtime.util.StreamRecordUtils.binaryrow
+import org.apache.flink.table.runtime.util.BaseRowHarnessAssertor
+import org.apache.flink.table.runtime.utils.StreamingWithStateTestBase.StateBackendMode
+import org.apache.flink.types.Row
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.mutable
+
+@RunWith(classOf[Parameterized])
+class OverWindowHarnessTest(mode: StateBackendMode) extends HarnessTestBase(mode) {
+
+  @Test
+  def testProcTimeBoundedRowsOver(): Unit = {
+
+    val data = new mutable.MutableList[(Long, String, Long)]
+    val t = env.fromCollection(data).toTable(tEnv, 'currtime, 'b, 'c, 'proctime)
+    tEnv.registerTable("T", t)
+
+    val sql =
+      """
+        |SELECT currtime, b, c,
+        | min(c) OVER
+        |   (PARTITION BY b ORDER BY proctime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW),
+        | max(c) OVER
+        |   (PARTITION BY b ORDER BY proctime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)
+        |FROM T
+      """.stripMargin
+    val t1 = tEnv.sqlQuery(sql)
+
+    tEnv.getConfig.withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3))
+    val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate")
+    val assertor = new BaseRowHarnessAssertor(
+      Array(Types.LONG, Types.STRING, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG))
+
+    testHarness.open()
+
+    // register cleanup timer with 3001
+    testHarness.setProcessingTime(1)
+
+    testHarness.processElement(new StreamRecord(
+      binaryrow(1L: JLong, "aaa", 1L: JLong, null)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(1L: JLong, "bbb", 10L: JLong, null)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(1L: JLong, "aaa", 2L: JLong, null)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(1L: JLong, "aaa", 3L: JLong, null)))
+
+    // register cleanup timer with 4100
+    testHarness.setProcessingTime(1100)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(1L: JLong, "bbb", 20L: JLong, null)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(1L: JLong, "aaa", 4L: JLong, null)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(1L: JLong, "aaa", 5L: JLong, null)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(1L: JLong, "aaa", 6L: JLong, null)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(1L: JLong, "bbb", 30L: JLong, null)))
+
+    // register cleanup timer with 6001
+    testHarness.setProcessingTime(3001)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(2L: JLong, "aaa", 7L: JLong, null)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(2L: JLong, "aaa", 8L: JLong, null)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(2L: JLong, "aaa", 9L: JLong, null)))
+
+    // trigger cleanup timer and register cleanup timer with 9002
+    testHarness.setProcessingTime(6002)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(2L: JLong, "aaa", 10L: JLong, null)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(2L: JLong, "bbb", 40L: JLong, null)))
+
+    val result = testHarness.getOutput
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    expectedOutput.add(new StreamRecord(
+      baserow(1L: JLong, "aaa", 1L: JLong, null, 1L: JLong, 1L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(1L: JLong, "bbb", 10L: JLong, null, 10L: JLong, 10L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(1L: JLong, "aaa", 2L: JLong, null, 1L: JLong, 2L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(1L: JLong, "aaa", 3L: JLong, null, 2L: JLong, 3L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(1L: JLong, "bbb", 20L: JLong, null, 10L: JLong, 20L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(1L: JLong, "aaa", 4L: JLong, null, 3L: JLong, 4L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(1L: JLong, "aaa", 5L: JLong, null, 4L: JLong, 5L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(1L: JLong, "aaa", 6L: JLong, null, 5L: JLong, 6L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(1L: JLong, "bbb", 30L: JLong, null, 20L: JLong, 30L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(2L: JLong, "aaa", 7L: JLong, null, 6L: JLong, 7L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(2L: JLong, "aaa", 8L: JLong, null, 7L: JLong, 8L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(2L: JLong, "aaa", 9L: JLong, null, 8L: JLong, 9L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(2L: JLong, "aaa", 10L: JLong, null, 9L: JLong, 10L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(2L: JLong, "bbb", 40L: JLong, null, 30L: JLong, 40L: JLong)))
+
+    assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result)
+
+    testHarness.close()
+  }
+
+  /**
+    * NOTE: all elements at the same proc timestamp have the same value per key
+    */
+  @Test
+  def testProcTimeBoundedRangeOver(): Unit = {
+
+    val data = new mutable.MutableList[(Long, String, Long)]
+    val t = env.fromCollection(data).toTable(tEnv, 'currtime, 'b, 'c, 'proctime)
+    tEnv.registerTable("T", t)
+
+    val sql =
+      """
+        |SELECT currtime, b, c,
+        | min(c) OVER
+        |   (PARTITION BY b ORDER BY proctime
+        |   RANGE BETWEEN INTERVAL '4' SECOND PRECEDING AND CURRENT ROW),
+        | max(c) OVER
+        |   (PARTITION BY b ORDER BY proctime
+        |   RANGE BETWEEN INTERVAL '4' SECOND PRECEDING AND CURRENT ROW)
+        |FROM T
+      """.stripMargin
+    val t1 = tEnv.sqlQuery(sql)
+
+    tEnv.getConfig.withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3))
+    val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate")
+    val assertor = new BaseRowHarnessAssertor(
+      Array(Types.LONG, Types.STRING, Types.LONG, Types.LONG, Types.LONG, Types.LONG))
+    testHarness.open()
+
+    // register cleanup timer with 3003
+    testHarness.setProcessingTime(3)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(0L: JLong, "aaa", 1L: JLong, null)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(0L: JLong, "bbb", 10L: JLong, null)))
+
+    testHarness.setProcessingTime(4)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(0L: JLong, "aaa", 2L: JLong, null)))
+
+    // trigger cleanup timer and register cleanup timer with 6003
+    testHarness.setProcessingTime(3003)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(0L: JLong, "aaa", 3L: JLong, null)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(0L: JLong, "bbb", 20L: JLong, null)))
+
+    testHarness.setProcessingTime(5)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(0L: JLong, "aaa", 4L: JLong, null)))
+
+    // register cleanup timer with 9002
+    testHarness.setProcessingTime(6002)
+
+    testHarness.setProcessingTime(7002)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(0L: JLong, "aaa", 5L: JLong, null)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(0L: JLong, "aaa", 6L: JLong, null)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(0L: JLong, "bbb", 30L: JLong, null)))
+
+    // register cleanup timer with 14002
+    testHarness.setProcessingTime(11002)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(0L: JLong, "aaa", 7L: JLong, null)))
+
+    testHarness.setProcessingTime(11004)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(0L: JLong, "aaa", 8L: JLong, null)))
+
+    testHarness.processElement(new StreamRecord(
+      binaryrow(0L: JLong, "aaa", 9L: JLong, null)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(0L: JLong, "aaa", 10L: JLong, null)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(0L: JLong, "bbb", 40L: JLong, null)))
+
+    testHarness.setProcessingTime(11006)
+
+    val result = testHarness.getOutput
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    // all elements at the same proc timestamp have the same value per key
+    expectedOutput.add(new StreamRecord(
+      baserow(0L: JLong, "aaa", 1L: JLong, null, 1L: JLong, 1L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(0L: JLong, "bbb", 10L: JLong, null, 10L: JLong, 10L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(0L: JLong, "aaa", 2L: JLong, null, 1L: JLong, 2L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(0L: JLong, "aaa", 3L: JLong, null, 1L: JLong, 4L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(0L: JLong, "bbb", 20L: JLong, null, 10L: JLong, 20L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(0L: JLong, "aaa", 4L: JLong, null, 1L: JLong, 4L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(0L: JLong, "aaa", 5L: JLong, null, 3L: JLong, 6L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(0L: JLong, "aaa", 6L: JLong, null, 3L: JLong, 6L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(0L: JLong, "bbb", 30L: JLong, null, 20L: JLong, 30L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(0L: JLong, "aaa", 7L: JLong, null, 5L: JLong, 7L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(0L: JLong, "aaa", 8L: JLong, null, 7L: JLong, 10L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(0L: JLong, "aaa", 9L: JLong, null, 7L: JLong, 10L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(0L: JLong, "aaa", 10L: JLong, null, 7L: JLong, 10L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(0L: JLong, "bbb", 40L: JLong, null, 40L: JLong, 40L: JLong)))
+
+    assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result)
+
+    // test for clean-up timer NPE
+    testHarness.setProcessingTime(20000)
+
+    // timer registered for 23000
+    testHarness.processElement(new StreamRecord(
+      binaryrow(0L: JLong, "ccc", 10L: JLong, null)))
+
+    // update clean-up timer to 25500. Previous timer should not clean up
+    testHarness.setProcessingTime(22500)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(0L: JLong, "ccc", 10L: JLong, null)))
+
+    // 23000 clean-up timer should fire but not fail with an NPE
+    testHarness.setProcessingTime(23001)
+
+    testHarness.close()
+  }
+
+  @Test
+  def testProcTimeUnboundedOver(): Unit = {
+
+    val data = new mutable.MutableList[(Long, String, Long)]
+    val t = env.fromCollection(data).toTable(tEnv, 'currtime, 'b, 'c, 'proctime)
+    tEnv.registerTable("T", t)
+
+    val sql =
+      """
+        |SELECT currtime, b, c,
+        | min(c) OVER
+        |   (PARTITION BY b ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW),
+        | max(c) OVER
+        |   (PARTITION BY b ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)
+        |FROM T
+      """.stripMargin
+    val t1 = tEnv.sqlQuery(sql)
+
+    tEnv.getConfig.withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3))
+    val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate")
+    val assertor = new BaseRowHarnessAssertor(
+      Array(Types.LONG, Types.STRING, Types.LONG, Types.LONG, Types.LONG, Types.LONG))
+
+    testHarness.open()
+
+    // register cleanup timer with 4003
+    testHarness.setProcessingTime(1003)
+
+    testHarness.processElement(new StreamRecord(
+      binaryrow(0L: JLong, "aaa", 1L: JLong, null)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(0L: JLong, "bbb", 10L: JLong, null)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(0L: JLong, "aaa", 2L: JLong, null)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(0L: JLong, "aaa", 3L: JLong, null)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(0L: JLong, "bbb", 20L: JLong, null)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(0L: JLong, "aaa", 4L: JLong, null)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(0L: JLong, "aaa", 5L: JLong, null)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(0L: JLong, "aaa", 6L: JLong, null)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(0L: JLong, "bbb", 30L: JLong, null)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(0L: JLong, "aaa", 7L: JLong, null)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(0L: JLong, "aaa", 8L: JLong, null)))
+
+    // trigger cleanup timer and register cleanup timer with 8003
+    testHarness.setProcessingTime(5003)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(0L: JLong, "aaa", 9L: JLong, null)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(0L: JLong, "aaa", 10L: JLong, null)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(0L: JLong, "bbb", 40L: JLong, null)))
+
+    val result = testHarness.getOutput
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    expectedOutput.add(new StreamRecord(
+      baserow(0L: JLong, "aaa", 1L: JLong, null, 1L: JLong, 1L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(0L: JLong, "bbb", 10L: JLong, null, 10L: JLong, 10L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(0L: JLong, "aaa", 2L: JLong, null, 1L: JLong, 2L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(0L: JLong, "aaa", 3L: JLong, null, 1L: JLong, 3L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(0L: JLong, "bbb", 20L: JLong, null, 10L: JLong, 20L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(0L: JLong, "aaa", 4L: JLong, null, 1L: JLong, 4L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(0L: JLong, "aaa", 5L: JLong, null, 1L: JLong, 5L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(0L: JLong, "aaa", 6L: JLong, null, 1L: JLong, 6L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(0L: JLong, "bbb", 30L: JLong, null, 10L: JLong, 30L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(0L: JLong, "aaa", 7L: JLong, null, 1L: JLong, 7L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(0L: JLong, "aaa", 8L: JLong, null, 1L: JLong, 8L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(0L: JLong, "aaa", 9L: JLong, null, 1L: JLong, 9L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(0L: JLong, "aaa", 10L: JLong, null, 1L: JLong, 10L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(0L: JLong, "bbb", 40L: JLong, null, 10L: JLong, 40L: JLong)))
+
+    assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result)
+    testHarness.close()
+  }
+
+  /**
+    * all elements at the same row-time have the same value per key
+    */
+  @Test
+  def testRowTimeBoundedRangeOver(): Unit = {
+
+    val data = new mutable.MutableList[(Long, String, Long)]
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val t = env.fromCollection(data).toTable(tEnv, 'rowtime, 'b, 'c)
+    tEnv.registerTable("T", t)
+
+    val sql =
+      """
+        |SELECT rowtime, b, c,
+        | min(c) OVER
+        |   (PARTITION BY b ORDER BY rowtime
+        |   RANGE BETWEEN INTERVAL '4' SECOND PRECEDING AND CURRENT ROW),
+        | max(c) OVER
+        |   (PARTITION BY b ORDER BY rowtime
+        |   RANGE BETWEEN INTERVAL '4' SECOND PRECEDING AND CURRENT ROW)
+        |FROM T
+      """.stripMargin
+    val t1 = tEnv.sqlQuery(sql)
+
+    tEnv.getConfig.withIdleStateRetentionTime(Time.seconds(1), Time.seconds(2))
+    val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate")
+    val assertor = new BaseRowHarnessAssertor(
+      Array(Types.LONG, Types.STRING, Types.LONG, Types.LONG, Types.LONG))
+
+    testHarness.open()
+
+    testHarness.processWatermark(1)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(2L: JLong, "aaa", 1L: JLong)))
+
+    testHarness.processWatermark(2)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(3L: JLong, "bbb", 10L: JLong)))
+
+    testHarness.processWatermark(4000)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(4001L: JLong, "aaa", 2L: JLong)))
+
+    testHarness.processWatermark(4001)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(4002L: JLong, "aaa", 3L: JLong)))
+
+    testHarness.processWatermark(4002)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(4003L: JLong, "aaa", 4L: JLong)))
+
+    testHarness.processWatermark(4800)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(4801L: JLong, "bbb", 25L: JLong)))
+
+    testHarness.processWatermark(6500)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(6501L: JLong, "aaa", 5L: JLong)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(6501L: JLong, "aaa", 6L: JLong)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(6501L: JLong, "bbb", 30L: JLong)))
+
+    testHarness.processWatermark(7000)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(7001L: JLong, "aaa", 7L: JLong)))
+
+    testHarness.processWatermark(8000)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(8001L: JLong, "aaa", 8L: JLong)))
+
+    testHarness.processWatermark(12000)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(12001L: JLong, "aaa", 9L: JLong)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(12001L: JLong, "aaa", 10L: JLong)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(12001L: JLong, "bbb", 40L: JLong)))
+
+    testHarness.processWatermark(19000)
+
+    // test cleanup
+    testHarness.setProcessingTime(1000)
+    testHarness.processWatermark(20000)
+
+    // check that state is removed after max retention time
+    testHarness.processElement(new StreamRecord(
+      binaryrow(20001L: JLong, "ccc", 1L: JLong))) // clean-up 3000
+    testHarness.setProcessingTime(2500)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(20002L: JLong, "ccc", 2L: JLong))) // clean-up 4500
+    testHarness.processWatermark(20010) // compute output
+
+    testHarness.setProcessingTime(4499)
+    testHarness.setProcessingTime(4500)
+
+    // check that state is only removed if all data was processed
+    testHarness.processElement(new StreamRecord(
+      binaryrow(20011L: JLong, "ccc", 3L: JLong))) // clean-up 6500
+
+    testHarness.setProcessingTime(6500) // clean-up attempt but rescheduled to 8500
+
+    testHarness.processWatermark(20020) // schedule emission
+
+    testHarness.setProcessingTime(8499) // clean-up
+    testHarness.setProcessingTime(8500) // clean-up
+
+    val result = dropWatermarks(testHarness.getOutput.toArray)
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    // all elements at the same row-time have the same value per key
+    expectedOutput.add(new StreamRecord(
+      baserow(2L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(3L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(4002L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(4003L: JLong, "aaa", 4L: JLong, 2L: JLong, 4L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(4801L: JLong, "bbb", 25L: JLong, 25L: JLong, 25L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(6501L: JLong, "aaa", 5L: JLong, 2L: JLong, 6L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(6501L: JLong, "aaa", 6L: JLong, 2L: JLong, 6L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(7001L: JLong, "aaa", 7L: JLong, 2L: JLong, 7L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(8001L: JLong, "aaa", 8L: JLong, 2L: JLong, 8L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(6501L: JLong, "bbb", 30L: JLong, 25L: JLong, 30L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(12001L: JLong, "aaa", 9L: JLong, 8L: JLong, 10L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(12001L: JLong, "aaa", 10L: JLong, 8L: JLong, 10L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(12001L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong)))
+
+    expectedOutput.add(new StreamRecord(
+      baserow(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(20011L: JLong, "ccc", 3L: JLong, 1L: JLong, 3L: JLong)))
+
+    assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result)
+    testHarness.close()
+  }
+
+  @Test
+  def testRowTimeBoundedRowsOver(): Unit = {
+
+    val data = new mutable.MutableList[(Long, String, Long)]
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val t = env.fromCollection(data).toTable(tEnv, 'rowtime, 'b, 'c)
+    tEnv.registerTable("T", t)
+
+    val sql =
+      """
+        |SELECT rowtime, b, c,
+        | min(c) OVER
+        |   (PARTITION BY b ORDER BY rowtime
+        |   ROWS BETWEEN 2 PRECEDING AND CURRENT ROW),
+        | max(c) OVER
+        |   (PARTITION BY b ORDER BY rowtime
+        |   ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
+        |FROM T
+      """.stripMargin
+    val t1 = tEnv.sqlQuery(sql)
+
+    tEnv.getConfig.withIdleStateRetentionTime(Time.seconds(1), Time.seconds(2))
+    val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate")
+    val assertor = new BaseRowHarnessAssertor(
+      Array(Types.LONG, Types.STRING, Types.LONG, Types.LONG, Types.LONG))
+
+    testHarness.open()
+
+    testHarness.processWatermark(800)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(801L: JLong, "aaa", 1L: JLong)))
+
+    testHarness.processWatermark(2500)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(2501L: JLong, "bbb", 10L: JLong)))
+
+    testHarness.processWatermark(4000)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(4001L: JLong, "aaa", 2L: JLong)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(4001L: JLong, "aaa", 3L: JLong)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(4001L: JLong, "bbb", 20L: JLong)))
+
+    testHarness.processWatermark(4800)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(4801L: JLong, "aaa", 4L: JLong)))
+
+    testHarness.processWatermark(6500)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(6501L: JLong, "aaa", 5L: JLong)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(6501L: JLong, "aaa", 6L: JLong)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(6501L: JLong, "bbb", 30L: JLong)))
+
+    testHarness.processWatermark(7000)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(7001L: JLong, "aaa", 7L: JLong)))
+
+    testHarness.processWatermark(8000)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(8001L: JLong, "aaa", 8L: JLong)))
+
+    testHarness.processWatermark(12000)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(12001L: JLong, "aaa", 9L: JLong)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(12001L: JLong, "aaa", 10L: JLong)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(12001L: JLong, "bbb", 40L: JLong)))
+
+    testHarness.processWatermark(19000)
+
+    // test cleanup
+    testHarness.setProcessingTime(1000)
+    testHarness.processWatermark(20000)
+
+    // check that state is removed after max retention time
+    testHarness.processElement(new StreamRecord(
+      binaryrow(20001L: JLong, "ccc", 1L: JLong))) // clean-up 3000
+    testHarness.setProcessingTime(2500)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(20002L: JLong, "ccc", 2L: JLong))) // clean-up 4500
+    testHarness.processWatermark(20010) // compute output
+
+    testHarness.setProcessingTime(4499)
+    testHarness.setProcessingTime(4500)
+
+    // check that state is only removed if all data was processed
+    testHarness.processElement(new StreamRecord(
+      binaryrow(20011L: JLong, "ccc", 3L: JLong))) // clean-up 6500
+
+    testHarness.setProcessingTime(6500) // clean-up attempt but rescheduled to 8500
+
+    testHarness.processWatermark(20020) // schedule emission
+
+    testHarness.setProcessingTime(8499) // clean-up
+    testHarness.setProcessingTime(8500) // clean-up
+
+
+    val result = dropWatermarks(testHarness.getOutput.toArray)
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    expectedOutput.add(new StreamRecord(
+      baserow(801L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(2501L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(4001L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(4001L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(4801L: JLong, "aaa", 4L: JLong, 2L: JLong, 4L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(6501L: JLong, "aaa", 5L: JLong, 3L: JLong, 5L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(6501L: JLong, "aaa", 6L: JLong, 4L: JLong, 6L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(6501L: JLong, "bbb", 30L: JLong, 10L: JLong, 30L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(7001L: JLong, "aaa", 7L: JLong, 5L: JLong, 7L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(8001L: JLong, "aaa", 8L: JLong, 6L: JLong, 8L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(12001L: JLong, "aaa", 9L: JLong, 7L: JLong, 9L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(12001L: JLong, "aaa", 10L: JLong, 8L: JLong, 10L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(12001L: JLong, "bbb", 40L: JLong, 20L: JLong, 40L: JLong)))
+
+    expectedOutput.add(new StreamRecord(
+      baserow(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(20011L: JLong, "ccc", 3L: JLong, 1L: JLong, 3L: JLong)))
+
+    assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result)
+    testHarness.close()
+  }
+
+  /**
+    * all elements at the same row-time have the same value per key
+    */
+  @Test
+  def testRowTimeUnboundedRangeOver(): Unit = {
+
+    val data = new mutable.MutableList[(Long, String, Long)]
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val t = env.fromCollection(data).toTable(tEnv, 'rowtime, 'b, 'c)
+    tEnv.registerTable("T", t)
+
+    val sql =
+      """
+        |SELECT rowtime, b, c,
+        | min(c) OVER
+        |   (PARTITION BY b ORDER BY rowtime
+        |   RANGE BETWEEN UNBOUNDED preceding AND CURRENT ROW),
+        | max(c) OVER
+        |   (PARTITION BY b ORDER BY rowtime
+        |   RANGE BETWEEN UNBOUNDED preceding AND CURRENT ROW)
+        |FROM T
+      """.stripMargin
+    val t1 = tEnv.sqlQuery(sql)
+
+    tEnv.getConfig.withIdleStateRetentionTime(Time.seconds(1), Time.seconds(2))
+    val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate")
+    val assertor = new BaseRowHarnessAssertor(
+      Array(Types.LONG, Types.STRING, Types.LONG, Types.LONG, Types.LONG))
+
+    testHarness.open()
+
+    testHarness.setProcessingTime(1000)
+    testHarness.processWatermark(800)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(801L: JLong, "aaa", 1L: JLong)))
+
+    testHarness.processWatermark(2500)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(2501L: JLong, "bbb", 10L: JLong)))
+
+    testHarness.processWatermark(4000)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(4001L: JLong, "aaa", 2L: JLong)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(4001L: JLong, "aaa", 3L: JLong)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(4001L: JLong, "bbb", 20L: JLong)))
+
+    testHarness.processWatermark(4800)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(4801L: JLong, "aaa", 4L: JLong)))
+
+    testHarness.processWatermark(6500)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(6501L: JLong, "aaa", 5L: JLong)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(6501L: JLong, "aaa", 6L: JLong)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(6501L: JLong, "bbb", 30L: JLong)))
+
+    testHarness.processWatermark(7000)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(7001L: JLong, "aaa", 7L: JLong)))
+
+    testHarness.processWatermark(8000)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(8001L: JLong, "aaa", 8L: JLong)))
+
+    testHarness.processWatermark(12000)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(12001L: JLong, "aaa", 9L: JLong)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(12001L: JLong, "aaa", 10L: JLong)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(12001L: JLong, "bbb", 40L: JLong)))
+
+    testHarness.processWatermark(19000)
+
+    // test cleanup
+    testHarness.setProcessingTime(2999) // clean up timer is 3000, so nothing should happen
+    testHarness.setProcessingTime(3000) // clean up is triggered
+
+    testHarness.processWatermark(20000)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(20000L: JLong, "ccc", 1L: JLong))) // test for late data
+
+    testHarness.processElement(new StreamRecord(
+      binaryrow(20001L: JLong, "ccc", 1L: JLong))) // clean-up 5000
+    testHarness.setProcessingTime(2500)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(20002L: JLong, "ccc", 2L: JLong))) // clean-up 5000
+
+    testHarness.setProcessingTime(5000) // does not clean up, because data left. New timer 7000
+    testHarness.processWatermark(20010) // compute output
+
+    testHarness.setProcessingTime(6999) // clean up timer is 3000, so nothing should happen
+    testHarness.setProcessingTime(7000) // clean up is triggered
+
+    val result = dropWatermarks(testHarness.getOutput.toArray)
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    // all elements at the same row-time have the same value per key
+    expectedOutput.add(new StreamRecord(
+      baserow(801L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(2501L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 3L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(4001L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(4001L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(4801L: JLong, "aaa", 4L: JLong, 1L: JLong, 4L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(6501L: JLong, "aaa", 5L: JLong, 1L: JLong, 6L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(6501L: JLong, "aaa", 6L: JLong, 1L: JLong, 6L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(6501L: JLong, "bbb", 30L: JLong, 10L: JLong, 30L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(7001L: JLong, "aaa", 7L: JLong, 1L: JLong, 7L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(8001L: JLong, "aaa", 8L: JLong, 1L: JLong, 8L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(12001L: JLong, "aaa", 9L: JLong, 1L: JLong, 10L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(12001L: JLong, "aaa", 10L: JLong, 1L: JLong, 10L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(12001L: JLong, "bbb", 40L: JLong, 10L: JLong, 40L: JLong)))
+
+    expectedOutput.add(new StreamRecord(
+      baserow(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong)))
+
+    assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result)
+    testHarness.close()
+  }
+
+  @Test
+  def testRowTimeUnboundedRowsOver(): Unit = {
+
+    val data = new mutable.MutableList[(Long, String, Long)]
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val t = env.fromCollection(data).toTable(tEnv, 'rowtime, 'b, 'c)
+    tEnv.registerTable("T", t)
+
+    val sql =
+      """
+        |SELECT rowtime, b, c,
+        | min(c) OVER
+        |   (PARTITION BY b ORDER BY rowtime
+        |   ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW),
+        | max(c) OVER
+        |   (PARTITION BY b ORDER BY rowtime
+        |   ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)
+        |FROM T
+      """.stripMargin
+    val t1 = tEnv.sqlQuery(sql)
+
+    tEnv.getConfig.withIdleStateRetentionTime(Time.seconds(1), Time.seconds(2))
+    val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate")
+    val assertor = new BaseRowHarnessAssertor(
+      Array(Types.LONG, Types.STRING, Types.LONG, Types.LONG, Types.LONG))
+
+    testHarness.open()
+
+    testHarness.setProcessingTime(1000)
+    testHarness.processWatermark(800)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(801L: JLong, "aaa", 1L: JLong)))
+
+    testHarness.processWatermark(2500)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(2501L: JLong, "bbb", 10L: JLong)))
+
+    testHarness.processWatermark(4000)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(4001L: JLong, "aaa", 2L: JLong)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(4001L: JLong, "aaa", 3L: JLong)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(4001L: JLong, "bbb", 20L: JLong)))
+
+    testHarness.processWatermark(4800)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(4801L: JLong, "aaa", 4L: JLong)))
+
+    testHarness.processWatermark(6500)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(6501L: JLong, "aaa", 5L: JLong)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(6501L: JLong, "aaa", 6L: JLong)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(6501L: JLong, "bbb", 30L: JLong)))
+
+    testHarness.processWatermark(7000)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(7001L: JLong, "aaa", 7L: JLong)))
+
+    testHarness.processWatermark(8000)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(8001L: JLong, "aaa", 8L: JLong)))
+
+    testHarness.processWatermark(12000)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(12001L: JLong, "aaa", 9L: JLong)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(12001L: JLong, "aaa", 10L: JLong)))
+    testHarness.processElement(new StreamRecord(
+      binaryrow(12001L: JLong, "bbb", 40L: JLong)))
+
+    testHarness.processWatermark(19000)
+
+    // test cleanup
+    testHarness.setProcessingTime(2999) // clean up timer is 3000, so nothing should happen
+    testHarness.setProcessingTime(3000) // clean up is triggered
+
+    testHarness.processWatermark(20000)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(20000L: JLong, "ccc", 2L: JLong))) // test for late data
+
+    testHarness.processElement(new StreamRecord(
+      binaryrow(20001L: JLong, "ccc", 1L: JLong))) // clean-up 5000
+    testHarness.setProcessingTime(2500)
+    testHarness.processElement(new StreamRecord(
+      binaryrow(20002L: JLong, "ccc", 2L: JLong))) // clean-up 5000
+
+    testHarness.setProcessingTime(5000) // does not clean up, because data left. New timer 7000
+    testHarness.processWatermark(20010) // compute output
+
+    testHarness.setProcessingTime(6999) // clean up timer is 3000, so nothing should happen
+    testHarness.setProcessingTime(7000) // clean up is triggered
+
+    val result = dropWatermarks(testHarness.getOutput.toArray)
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    expectedOutput.add(new StreamRecord(
+      baserow(801L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(2501L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(4001L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(4001L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(4801L: JLong, "aaa", 4L: JLong, 1L: JLong, 4L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(6501L: JLong, "aaa", 5L: JLong, 1L: JLong, 5L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(6501L: JLong, "aaa", 6L: JLong, 1L: JLong, 6L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(6501L: JLong, "bbb", 30L: JLong, 10L: JLong, 30L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(7001L: JLong, "aaa", 7L: JLong, 1L: JLong, 7L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(8001L: JLong, "aaa", 8L: JLong, 1L: JLong, 8L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(12001L: JLong, "aaa", 9L: JLong, 1L: JLong, 9L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(12001L: JLong, "aaa", 10L: JLong, 1L: JLong, 10L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(12001L: JLong, "bbb", 40L: JLong, 10L: JLong, 40L: JLong)))
+
+    expectedOutput.add(new StreamRecord(
+      baserow(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong)))
+    expectedOutput.add(new StreamRecord(
+      baserow(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong)))
+
+    assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result)
+    testHarness.close()
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
new file mode 100644
index 0000000..72ca720
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
@@ -0,0 +1,1025 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeProcessOperator
+import org.apache.flink.table.runtime.utils.StreamingWithStateTestBase.StateBackendMode
+import org.apache.flink.table.runtime.utils.UserDefinedFunctionTestUtils.{CountNullNonNull, CountPairs, LargerThanCount}
+import org.apache.flink.table.runtime.utils.{StreamTestData, StreamingWithStateTestBase, TestingAppendSink}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.mutable
+
+@RunWith(classOf[Parameterized])
+class OverWindowITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode) {
+
+  val data = List(
+    (1L, 1, "Hello"),
+    (2L, 2, "Hello"),
+    (3L, 3, "Hello"),
+    (4L, 4, "Hello"),
+    (5L, 5, "Hello"),
+    (6L, 6, "Hello"),
+    (7L, 7, "Hello World"),
+    (8L, 8, "Hello World"),
+    (20L, 20, "Hello World"))
+
+  @Test
+  def testProcTimeBoundedPartitionedRowsOver(): Unit = {
+    val t = failingDataSource(StreamTestData.get5TupleData)
+      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime)
+    tEnv.registerTable("MyTable", t)
+
+    val sqlQuery = "SELECT a, " +
+      "  SUM(c) OVER (" +
+      "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW), " +
+      "  MIN(c) OVER (" +
+      "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) " +
+      "FROM MyTable"
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "1,0,0",
+      "2,1,1",
+      "2,3,1",
+      "3,3,3",
+      "3,7,3",
+      "3,12,3",
+      "4,6,6",
+      "4,13,6",
+      "4,21,6",
+      "4,30,6",
+      "5,10,10",
+      "5,21,10",
+      "5,33,10",
+      "5,46,10",
+      "5,60,10")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testProcTimeBoundedPartitionedRowsOverWithBultinProctime(): Unit = {
+    val t = failingDataSource(StreamTestData.get5TupleData)
+      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime)
+    tEnv.registerTable("MyTable", t)
+
+    val sqlQuery = "SELECT a, " +
+      "  SUM(c) OVER (" +
+      "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW), " +
+      "  MIN(c) OVER (" +
+      "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) " +
+      "FROM MyTable"
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "1,0,0",
+      "2,1,1",
+      "2,3,1",
+      "3,3,3",
+      "3,7,3",
+      "3,12,3",
+      "4,6,6",
+      "4,13,6",
+      "4,21,6",
+      "4,30,6",
+      "5,10,10",
+      "5,21,10",
+      "5,33,10",
+      "5,46,10",
+      "5,60,10")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime(): Unit = {
+    val t = failingDataSource(StreamTestData.get5TupleData)
+      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime)
+    tEnv.registerTable("MyTable", t)
+
+    val sqlQuery = "SELECT a, " +
+      "  SUM(c) OVER (" +
+      "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW), " +
+      "  MIN(c) OVER (" +
+      "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) " +
+      "FROM MyTable"
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "1,0,0",
+      "2,1,1",
+      "2,3,1",
+      "3,3,3",
+      "3,7,3",
+      "3,12,3",
+      "4,6,6",
+      "4,13,6",
+      "4,21,6",
+      "4,30,6",
+      "5,10,10",
+      "5,21,10",
+      "5,33,10",
+      "5,46,10",
+      "5,60,10")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testProcTimeBoundedNonPartitionedRowsOver(): Unit = {
+    val t = failingDataSource(StreamTestData.get5TupleData)
+      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime)
+    tEnv.registerTable("MyTable", t)
+
+    val sqlQuery = "SELECT a, " +
+      " first_value(d) OVER (" +
+      "    ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW), " +
+      " last_value(d) OVER (" +
+      "    ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW), " +
+      "  SUM(c) OVER (" +
+      "    ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW), " +
+      "  MIN(c) OVER (" +
+      "    ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) " +
+      "FROM MyTable"
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "1,Hallo,Hallo,0,0",
+      "2,Hallo,Hallo Welt,1,0",
+      "2,Hallo,Hallo Welt wie,3,0",
+      "3,Hallo,Hallo Welt wie gehts?,6,0",
+      "3,Hallo,ABC,10,0",
+      "3,Hallo,BCD,15,0",
+      "4,Hallo,CDE,21,0",
+      "4,Hallo,DEF,28,0",
+      "4,Hallo,EFG,36,0",
+      "4,Hallo,FGH,45,0",
+      "5,Hallo,GHI,55,0",
+      "5,Hallo Welt,HIJ,66,1",
+      "5,Hallo Welt wie,IJK,77,2",
+      "5,Hallo Welt wie gehts?,JKL,88,3",
+      "5,ABC,KLM,99,4")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testProcTimeUnboundedPartitionedRangeOver(): Unit = {
+    val t1 = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c, 'proctime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "c, " +
+      "first_value(b) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding)," +
+      "last_value(b) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding)," +
+      "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding), " +
+      "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) " +
+      "from T1"
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "Hello World,7,7,1,7", "Hello World,7,8,2,15", "Hello World,7,20,3,35",
+      "Hello,1,1,1,1", "Hello,1,2,2,3", "Hello,1,3,3,6", "Hello,1,4,4,10", "Hello,1,5,5,15",
+      "Hello,1,6,6,21")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testProcTimeUnboundedPartitionedRowsOver(): Unit = {
+    val t1 = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c, 'proctime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sql =
+      """
+        |SELECT c, sum1, maxnull
+        |FROM (
+        | SELECT c,
+        |  max(cast(null as varchar)) OVER
+        |   (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)
+        |   as maxnull,
+        |  sum(1) OVER
+        |   (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)
+        |   as sum1
+        | FROM T1
+        |)
+      """.stripMargin
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "Hello World,1,null", "Hello World,2,null", "Hello World,3,null",
+      "Hello,1,null", "Hello,2,null", "Hello,3,null", "Hello,4,null",
+      "Hello,5,null", "Hello,6,null")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+
+  }
+
+  @Test
+  def testProcTimeUnboundedNonPartitionedRangeOver(): Unit = {
+    tEnv.getConfig.withIdleStateRetentionTime(Time.hours(2), Time.hours(3))
+
+    val t1 = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c, 'proctime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "c, " +
+      "count(a) OVER (ORDER BY proctime  RANGE UNBOUNDED preceding), " +
+      "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) " +
+      "from T1"
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "Hello World,7,28", "Hello World,8,36", "Hello World,9,56",
+      "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testProcTimeUnboundedNonPartitionedRowsOver(): Unit = {
+    val t1 = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c, 'proctime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
+      "from T1"
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = List("1", "2", "3", "4", "5", "6", "7", "8", "9")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testRowTimeBoundedPartitionedRangeOver(): Unit = {
+    val data: Seq[Either[(Long, (Long, Int, String)), Long]] = Seq(
+      Left((1500L, (1L, 15, "Hello"))),
+      Left((1600L, (1L, 16, "Hello"))),
+      Left((1000L, (1L, 1, "Hello"))),
+      Left((2000L, (2L, 2, "Hello"))),
+      Right(1000L),
+      Left((2000L, (2L, 2, "Hello"))),
+      Left((2000L, (2L, 3, "Hello"))),
+      Left((3000L, (3L, 3, "Hello"))),
+      Right(2000L),
+      Left((4000L, (4L, 4, "Hello"))),
+      Right(3000L),
+      Left((5000L, (5L, 5, "Hello"))),
+      Right(5000L),
+      Left((6000L, (6L, 6, "Hello"))),
+      Left((6500L, (6L, 65, "Hello"))),
+      Right(7000L),
+      Left((9000L, (6L, 9, "Hello"))),
+      Left((9500L, (6L, 18, "Hello"))),
+      Left((9000L, (6L, 9, "Hello"))),
+      Right(10000L),
+      Left((10000L, (7L, 7, "Hello World"))),
+      Left((11000L, (7L, 17, "Hello World"))),
+      Left((11000L, (7L, 77, "Hello World"))),
+      Right(12000L),
+      Left((14000L, (7L, 18, "Hello World"))),
+      Right(14000L),
+      Left((15000L, (8L, 8, "Hello World"))),
+      Right(17000L),
+      Left((20000L, (20L, 20, "Hello World"))),
+      Right(19000L))
+
+    val source = failingDataSource(data)
+    val t1 = source.transform("TimeAssigner", new EventTimeProcessOperator[(Long, Int, String)])
+      .setParallelism(source.parallelism)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime)
+
+    tEnv.registerTable("T1", t1)
+    tEnv.registerFunction("LTCNT", new LargerThanCount)
+
+    val sqlQuery = "SELECT " +
+      "  c, b, " +
+      "  LTCNT(a, CAST('4' AS BIGINT)) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
+      "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
+      "  first_value(a, a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
+      "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
+      "  last_value(a, a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
+      "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
+      "  COUNT(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
+      "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
+      "  SUM(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
+      "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW)" +
+      " FROM T1"
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "Hello,1,0,1,1,1,1", "Hello,15,0,1,1,2,2", "Hello,16,0,1,1,3,3",
+      "Hello,2,0,1,2,6,9", "Hello,3,0,1,2,6,9", "Hello,2,0,1,2,6,9",
+      "Hello,3,0,2,3,4,9",
+      "Hello,4,0,3,4,2,7",
+      "Hello,5,1,4,5,2,9",
+      "Hello,6,2,5,6,2,11", "Hello,65,2,6,6,2,12",
+      "Hello,9,2,6,6,2,12", "Hello,9,2,6,6,2,12", "Hello,18,3,6,6,3,18",
+      "Hello World,17,3,7,7,3,21",
+      "Hello World,7,1,7,7,1,7",
+      "Hello World,77,3,7,7,3,21",
+      "Hello World,18,1,7,7,1,7",
+      "Hello World,8,2,7,8,2,15",
+      "Hello World,20,1,20,20,1,20")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testRowTimeBoundedPartitionedRowsOver(): Unit = {
+    val data: Seq[Either[(Long, (Long, Int, String)), Long]] = Seq(
+      Left((1L, (1L, 1, "Hello"))),
+      Left((2L, (2L, 2, "Hello"))),
+      Left((1L, (1L, 1, "Hello"))),
+      Left((2L, (2L, 2, "Hello"))),
+      Left((2L, (2L, 2, "Hello"))),
+      Left((1L, (1L, 1, "Hello"))),
+      Left((3L, (7L, 7, "Hello World"))),
+      Left((1L, (7L, 7, "Hello World"))),
+      Left((1L, (7L, 7, "Hello World"))),
+      Right(2L),
+      Left((3L, (3L, 3, "Hello"))),
+      Left((4L, (4L, 4, "Hello"))),
+      Left((5L, (5L, 5, "Hello"))),
+      Left((6L, (6L, 6, "Hello"))),
+      Left((20L, (20L, 20, "Hello World"))),
+      Right(6L),
+      Left((8L, (8L, 8, "Hello World"))),
+      Left((7L, (7L, 7, "Hello World"))),
+      Right(20L))
+
+    val source = failingDataSource(data)
+    val t1 = source.transform("TimeAssigner", new EventTimeProcessOperator[(Long, Int, String)])
+      .setParallelism(source.parallelism)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime)
+
+    tEnv.registerTable("T1", t1)
+    tEnv.registerFunction("LTCNT", new LargerThanCount)
+
+    val sqlQuery = "SELECT " +
+      " c, a, " +
+      "  LTCNT(a, CAST('4' AS BIGINT)) " +
+      "    OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW), " +
+      "  COUNT(1) " +
+      "    OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW), " +
+      "  SUM(a) " +
+      "    OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) " +
+      "FROM T1"
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "Hello,1,0,1,1", "Hello,1,0,2,2", "Hello,1,0,3,3",
+      "Hello,2,0,3,4", "Hello,2,0,3,5", "Hello,2,0,3,6",
+      "Hello,3,0,3,7", "Hello,4,0,3,9", "Hello,5,1,3,12",
+      "Hello,6,2,3,15",
+      "Hello World,7,1,1,7", "Hello World,7,2,2,14", "Hello World,7,3,3,21",
+      "Hello World,7,3,3,21", "Hello World,8,3,3,22", "Hello World,20,3,3,35")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testRowTimeBoundedNonPartitionedRangeOver(): Unit = {
+    val data: Seq[Either[(Long, (Long, Int, String)), Long]] = Seq(
+      Left((1500L, (1L, 15, "Hello"))),
+      Left((1600L, (1L, 16, "Hello"))),
+      Left((1000L, (1L, 1, "Hello"))),
+      Left((2000L, (2L, 2, "Hello"))),
+      Right(1000L),
+      Left((2000L, (2L, 2, "Hello"))),
+      Left((2000L, (2L, 3, "Hello"))),
+      Left((3000L, (3L, 3, "Hello"))),
+      Right(2000L),
+      Left((4000L, (4L, 4, "Hello"))),
+      Right(3000L),
+      Left((5000L, (5L, 5, "Hello"))),
+      Right(5000L),
+      Left((6000L, (6L, 6, "Hello"))),
+      Left((6500L, (6L, 65, "Hello"))),
+      Right(7000L),
+      Left((9000L, (6L, 9, "Hello"))),
+      Left((9500L, (6L, 18, "Hello"))),
+      Left((9000L, (6L, 9, "Hello"))),
+      Right(10000L),
+      Left((10000L, (7L, 7, "Hello World"))),
+      Left((11000L, (7L, 17, "Hello World"))),
+      Left((11000L, (7L, 77, "Hello World"))),
+      Right(12000L),
+      Left((14000L, (7L, 18, "Hello World"))),
+      Right(14000L),
+      Left((15000L, (8L, 8, "Hello World"))),
+      Right(17000L),
+      Left((20000L, (20L, 20, "Hello World"))),
+      Right(19000L))
+
+    val source = failingDataSource(data)
+    val t1 = source.transform("TimeAssigner", new EventTimeProcessOperator[(Long, Int, String)])
+      .setParallelism(source.parallelism)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "  c, b, " +
+      "  COUNT(a) " +
+      "    OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
+      "  SUM(a) " +
+      "    OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW) " +
+      " FROM T1"
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
+      "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
+      "Hello,3,4,9",
+      "Hello,4,2,7",
+      "Hello,5,2,9",
+      "Hello,6,2,11", "Hello,65,2,12",
+      "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
+      "Hello World,7,4,25", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
+      "Hello World,8,2,15",
+      "Hello World,20,1,20")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testRowTimeBoundedNonPartitionedRowsOver(): Unit = {
+    val data: Seq[Either[(Long, (Long, Int, String)), Long]] = Seq(
+      Left((2L, (2L, 2, "Hello"))),
+      Left((2L, (2L, 2, "Hello"))),
+      Left((1L, (1L, 1, "Hello"))),
+      Left((1L, (1L, 1, "Hello"))),
+      Left((2L, (2L, 2, "Hello"))),
+      Left((1L, (1L, 1, "Hello"))),
+      Left((20L, (20L, 20, "Hello World"))), // early row
+      Right(3L),
+      Left((2L, (2L, 2, "Hello"))), // late row
+      Left((3L, (3L, 3, "Hello"))),
+      Left((4L, (4L, 4, "Hello"))),
+      Left((5L, (5L, 5, "Hello"))),
+      Left((6L, (6L, 6, "Hello"))),
+      Left((7L, (7L, 7, "Hello World"))),
+      Right(7L),
+      Left((9L, (9L, 9, "Hello World"))),
+      Left((8L, (8L, 8, "Hello World"))),
+      Left((8L, (8L, 8, "Hello World"))),
+      Right(20L))
+
+    val source = failingDataSource(data)
+    val t1 = source.transform("TimeAssigner", new EventTimeProcessOperator[(Long, Int, String)])
+      .setParallelism(source.parallelism)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "c, a, " +
+      "  COUNT(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW), " +
+      "  SUM(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW) " +
+      "FROM T1"
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
+      "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
+      "Hello,3,3,7",
+      "Hello,4,3,9", "Hello,5,3,12",
+      "Hello,6,3,15", "Hello World,7,3,18",
+      "Hello World,8,3,21", "Hello World,8,3,23",
+      "Hello World,9,3,25",
+      "Hello World,20,3,37")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testRowTimeUnBoundedPartitionedRangeOver(): Unit = {
+    val sqlQuery = "SELECT a, b, c, " +
+      "  LTCNT(b, CAST('4' AS BIGINT)) OVER(" +
+      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  SUM(b) OVER (" +
+      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  COUNT(b) OVER (" +
+      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  AVG(b) OVER (" +
+      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  MAX(b) OVER (" +
+      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  MIN(b) OVER (" +
+      "    PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " +
+      "FROM T1"
+
+    val data: Seq[Either[(Long, (Int, Long, String)), Long]] = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 1L, "Hello")),
+      Left(14000002L, (1, 1L, "Hello")),
+      Left(14000002L, (1, 2L, "Hello")),
+      Left(14000002L, (1, 3L, "Hello world")),
+      Left(14000003L, (2, 2L, "Hello world")),
+      Left(14000003L, (2, 3L, "Hello world")),
+      Right(14000020L),
+      Left(14000021L, (1, 4L, "Hello world")),
+      Left(14000022L, (1, 5L, "Hello world")),
+      Left(14000022L, (1, 6L, "Hello world")),
+      Left(14000022L, (1, 7L, "Hello world")),
+      Left(14000023L, (2, 4L, "Hello world")),
+      Left(14000023L, (2, 5L, "Hello world")),
+      Right(14000030L))
+
+    val source = failingDataSource(data)
+    val t1 = source.transform("TimeAssigner", new EventTimeProcessOperator[(Int, Long, String)])
+      .setParallelism(source.parallelism)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime)
+
+    tEnv.registerTable("T1", t1)
+    tEnv.registerFunction("LTCNT", new LargerThanCount)
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = List(
+      s"1,1,Hello,0,6,3,${6.0/3},3,1",
+      s"1,2,Hello,0,6,3,${6.0/3},3,1",
+      s"1,3,Hello world,0,6,3,${6.0/3},3,1",
+      s"1,1,Hi,0,7,4,${7.0/4},3,1",
+      s"2,1,Hello,0,1,1,${1.0/1},1,1",
+      s"2,2,Hello world,0,6,3,${6.0/3},3,1",
+      s"2,3,Hello world,0,6,3,${6.0/3},3,1",
+      s"1,4,Hello world,0,11,5,${11.0/5},4,1",
+      s"1,5,Hello world,3,29,8,${29.0/8},7,1",
+      s"1,6,Hello world,3,29,8,${29.0/8},7,1",
+      s"1,7,Hello world,3,29,8,${29.0/8},7,1",
+      s"2,4,Hello world,1,15,5,${15.0/5},5,1",
+      s"2,5,Hello world,1,15,5,${15.0/5},5,1")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testRowTimeUnBoundedPartitionedRowsOver(): Unit = {
+    val sqlQuery = "SELECT a, b, c, " +
+      "LTCNT(b, CAST('4' AS BIGINT)) over(" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "SUM(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "count(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "avg(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "max(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "min(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row) " +
+      "from T1"
+
+    val data: Seq[Either[(Long, (Int, Long, String)), Long]] = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 1L, "Hello")),
+      Left(14000002L, (3, 1L, "Hello")),
+      Left(14000003L, (1, 2L, "Hello")),
+      Left(14000004L, (1, 3L, "Hello world")),
+      Left(14000007L, (3, 2L, "Hello world")),
+      Left(14000008L, (2, 2L, "Hello world")),
+      Right(14000010L),
+      Left(14000012L, (1, 5L, "Hello world")),
+      Left(14000021L, (1, 6L, "Hello world")),
+      Left(14000023L, (2, 5L, "Hello world")),
+      Right(14000020L),
+      Left(14000024L, (3, 5L, "Hello world")),
+      Left(14000026L, (1, 7L, "Hello world")),
+      Left(14000025L, (1, 8L, "Hello world")),
+      Left(14000022L, (1, 9L, "Hello world")),
+      Right(14000030L))
+
+    val source = failingDataSource(data)
+    val t1 = source.transform("TimeAssigner", new EventTimeProcessOperator[(Int, Long, String)])
+      .setParallelism(source.parallelism)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime)
+
+    tEnv.registerTable("T1", t1)
+    tEnv.registerFunction("LTCNT", new LargerThanCount)
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      s"1,2,Hello,0,2,1,${2.0/1},2,2",
+      s"1,3,Hello world,0,5,2,${5.0/2},3,2",
+      s"1,1,Hi,0,6,3,${6.0/3},3,1",
+      s"2,1,Hello,0,1,1,${1.0/1},1,1",
+      s"2,2,Hello world,0,3,2,${3.0/2},2,1",
+      s"3,1,Hello,0,1,1,${1.0/1},1,1",
+      s"3,2,Hello world,0,3,2,${3.0/2},2,1",
+      s"1,5,Hello world,1,11,4,${11.0/4},5,1",
+      s"1,6,Hello world,2,17,5,${17.0/5},6,1",
+      s"1,9,Hello world,3,26,6,${26.0/6},9,1",
+      s"1,8,Hello world,4,34,7,${34.0/7},9,1",
+      s"1,7,Hello world,5,41,8,${41.0/8},9,1",
+      s"2,5,Hello world,1,8,3,${8.0/3},5,1",
+      s"3,5,Hello world,1,8,3,${8.0/3},5,1")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testRowTimeUnBoundedNonPartitionedRangeOver(): Unit = {
+    val sqlQuery = "SELECT a, b, c, " +
+      "  SUM(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  COUNT(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  AVG(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  MAX(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  MIN(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " +
+      "FROM T1"
+
+    val data: Seq[Either[(Long, (Int, Long, String)), Long]] = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 1L, "Hello")),
+      Left(14000002L, (1, 1L, "Hello")),
+      Left(14000002L, (1, 2L, "Hello")),
+      Left(14000002L, (1, 3L, "Hello world")),
+      Left(14000003L, (2, 2L, "Hello world")),
+      Left(14000003L, (2, 3L, "Hello world")),
+      Right(14000020L),
+      Left(14000021L, (1, 4L, "Hello world")),
+      Left(14000022L, (1, 5L, "Hello world")),
+      Left(14000022L, (1, 6L, "Hello world")),
+      Left(14000022L, (1, 7L, "Hello world")),
+      Left(14000023L, (2, 4L, "Hello world")),
+      Left(14000023L, (2, 5L, "Hello world")),
+      Right(14000030L))
+
+    val source = failingDataSource(data)
+    val t1 = source.transform("TimeAssigner", new EventTimeProcessOperator[(Int, Long, String)])
+      .setParallelism(source.parallelism)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = List(
+      s"2,1,Hello,1,1,${1.0/1},1,1",
+      s"1,1,Hello,7,4,${7.0/4},3,1",
+      s"1,2,Hello,7,4,${7.0/4},3,1",
+      s"1,3,Hello world,7,4,${7.0/4},3,1",
+      s"2,2,Hello world,12,6,${12.0/6},3,1",
+      s"2,3,Hello world,12,6,${12.0/6},3,1",
+      s"1,1,Hi,13,7,${13.0/7},3,1",
+      s"1,4,Hello world,17,8,${17.0/8},4,1",
+      s"1,5,Hello world,35,11,${35.0/11},7,1",
+      s"1,6,Hello world,35,11,${35.0/11},7,1",
+      s"1,7,Hello world,35,11,${35.0/11},7,1",
+      s"2,4,Hello world,44,13,${44.0/13},7,1",
+      s"2,5,Hello world,44,13,${44.0/13},7,1")
+
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testRowTimeUnBoundedNonPartitionedRowsOver(): Unit = {
+    val sqlQuery = "SELECT a, b, c, " +
+      "  SUM(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  COUNT(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  AVG(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  MAX(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+      "  MIN(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " +
+      "FROM T1"
+
+    val data: Seq[Either[(Long, (Int, Long, String)), Long]] = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 2L, "Hello")),
+      Left(14000002L, (3, 5L, "Hello")),
+      Left(14000003L, (1, 3L, "Hello")),
+      Left(14000004L, (3, 7L, "Hello world")),
+      Left(14000007L, (4, 9L, "Hello world")),
+      Left(14000008L, (5, 8L, "Hello world")),
+      Right(14000010L),
+      // this element will be discard because it is late
+      Left(14000008L, (6, 8L, "Hello world")),
+      Right(14000020L),
+      Left(14000021L, (6, 8L, "Hello world")),
+      Right(14000030L)
+    )
+
+    val source = failingDataSource(data)
+    val t1 = source.transform("TimeAssigner", new EventTimeProcessOperator[(Int, Long, String)])
+      .setParallelism(source.parallelism)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      s"2,2,Hello,2,1,${2.0/1},2,2",
+      s"3,5,Hello,7,2,${7.0/2},5,2",
+      s"1,3,Hello,10,3,${10.0/3},5,2",
+      s"3,7,Hello world,17,4,${17.0/4},7,2",
+      s"1,1,Hi,18,5,${18.0/5},7,1",
+      s"4,9,Hello world,27,6,${27.0/6},9,1",
+      s"5,8,Hello world,35,7,${35.0/7},9,1",
+      s"6,8,Hello world,43,8,${43.0/8},9,1")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  /** test sliding event-time unbounded window with partition by **/
+  @Test
+  def testRowTimeUnBoundedPartitionedRowsOver2(): Unit = {
+    val sqlQuery = "SELECT a, b, c, " +
+      "SUM(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "count(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "avg(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "max(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row), " +
+      "min(b) over (" +
+      "partition by a order by rowtime rows between unbounded preceding and current row) " +
+      "from T1"
+
+    val data: Seq[Either[(Long, (Int, Long, String)), Long]] = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 1L, "Hello")),
+      Left(14000002L, (3, 1L, "Hello")),
+      Left(14000003L, (1, 2L, "Hello")),
+      Left(14000004L, (1, 3L, "Hello world")),
+      Left(14000007L, (3, 2L, "Hello world")),
+      Left(14000008L, (2, 2L, "Hello world")),
+      Right(14000010L),
+      // the next 3 elements are late
+      Left(14000008L, (1, 4L, "Hello world")),
+      Left(14000008L, (2, 3L, "Hello world")),
+      Left(14000008L, (3, 3L, "Hello world")),
+      Left(14000012L, (1, 5L, "Hello world")),
+      Right(14000020L),
+      Left(14000021L, (1, 6L, "Hello world")),
+      // the next 3 elements are late
+      Left(14000019L, (1, 6L, "Hello world")),
+      Left(14000018L, (2, 4L, "Hello world")),
+      Left(14000018L, (3, 4L, "Hello world")),
+      Left(14000022L, (2, 5L, "Hello world")),
+      Left(14000022L, (3, 5L, "Hello world")),
+      Left(14000024L, (1, 7L, "Hello world")),
+      Left(14000023L, (1, 8L, "Hello world")),
+      Left(14000021L, (1, 9L, "Hello world")),
+      Right(14000030L)
+    )
+
+    val source = failingDataSource(data)
+    val t1 = source.transform("TimeAssigner", new EventTimeProcessOperator[(Int, Long, String)])
+      .setParallelism(source.parallelism)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = List(
+      s"1,2,Hello,2,1,${2.0/1},2,2",
+      s"1,3,Hello world,5,2,${5.0/2},3,2",
+      s"1,1,Hi,6,3,${6.0/3},3,1",
+      s"2,1,Hello,1,1,${1.0/1},1,1",
+      s"2,2,Hello world,3,2,${3.0/2},2,1",
+      s"3,1,Hello,1,1,${1.0/1},1,1",
+      s"3,2,Hello world,3,2,${3.0/2},2,1",
+      s"1,5,Hello world,11,4,${11.0/4},5,1",
+      s"1,6,Hello world,17,5,${17.0/5},6,1",
+      s"1,9,Hello world,26,6,${26.0/6},9,1",
+      s"1,8,Hello world,34,7,${34.0/7},9,1",
+      s"1,7,Hello world,41,8,${41.0/8},9,1",
+      s"2,5,Hello world,8,3,${8.0/3},5,1",
+      s"3,5,Hello world,8,3,${8.0/3},5,1"
+    )
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = {
+    val t = failingDataSource(StreamTestData.get5TupleData)
+      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime)
+    tEnv.registerTable("MyTable", t)
+
+    val sqlQuery = "SELECT a, " +
+      "  COUNT(e) OVER (" +
+      "    PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+      "  SUM(DISTINCT e) OVER (" +
+      "    PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+      "  MIN(DISTINCT e) OVER (" +
+      "    PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " +
+      "FROM MyTable"
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "1,1,1,1",
+      "2,1,2,2",
+      "2,2,3,1",
+      "3,1,2,2",
+      "3,2,2,2",
+      "3,3,5,2",
+      "4,1,2,2",
+      "4,2,3,1",
+      "4,3,3,1",
+      "4,4,3,1",
+      "5,1,1,1",
+      "5,2,4,1",
+      "5,3,4,1",
+      "5,4,6,1",
+      "5,5,6,1")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testRowTimeDistinctUnboundedPartitionedRangeOverWithNullValues(): Unit = {
+    val data = List(
+      (1L, 1, null),
+      (2L, 1, null),
+      (3L, 2, null),
+      (4L, 1, "Hello"),
+      (5L, 1, "Hello"),
+      (6L, 2, "Hello"),
+      (7L, 1, "Hello World"),
+      (8L, 2, "Hello World"),
+      (9L, 2, "Hello World"),
+      (10L, 1, null))
+
+    // for sum aggregation ensure that every time the order of each element is consistent
+    env.setParallelism(1)
+
+    val table = failingDataSource(data)
+      .assignAscendingTimestamps(_._1)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime)
+
+    tEnv.registerTable("MyTable", table)
+    tEnv.registerFunction("CntNullNonNull", new CountNullNonNull)
+
+    val sqlQuery = "SELECT " +
+      "  c, " +
+      "  b, " +
+      "  COUNT(DISTINCT c) " +
+      "    OVER (PARTITION BY b ORDER BY rowtime RANGE UNBOUNDED preceding), " +
+      "  CntNullNonNull(DISTINCT c) " +
+      "    OVER (PARTITION BY b ORDER BY rowtime RANGE UNBOUNDED preceding)" +
+      "FROM " +
+      "  MyTable"
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "null,1,0,0|1", "null,1,0,0|1", "null,2,0,0|1", "null,1,2,2|1",
+      "Hello,1,1,1|1", "Hello,1,1,1|1", "Hello,2,1,1|1",
+      "Hello World,1,2,2|1", "Hello World,2,2,2|1", "Hello World,2,2,2|1")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = {
+    val t = failingDataSource(StreamTestData.get5TupleData)
+      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime)
+    tEnv.registerTable("MyTable", t)
+
+    val sqlQuery = "SELECT a, " +
+      "  SUM(DISTINCT e) OVER (" +
+      "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " +
+      "  MIN(DISTINCT e) OVER (" +
+      "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " +
+      "  COLLECT(DISTINCT e) OVER (" +
+      "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) " +
+      "FROM MyTable"
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "1,1,1,{1=1}",
+      "2,2,2,{2=1}",
+      "2,3,1,{1=1, 2=1}",
+      "3,2,2,{2=1}",
+      "3,2,2,{2=1}",
+      "3,5,2,{2=1, 3=1}",
+      "4,2,2,{2=1}",
+      "4,3,1,{1=1, 2=1}",
+      "4,3,1,{1=1, 2=1}",
+      "4,3,1,{1=1, 2=1}",
+      "5,1,1,{1=1}",
+      "5,4,1,{1=1, 3=1}",
+      "5,4,1,{1=1, 3=1}",
+      "5,6,1,{1=1, 2=1, 3=1}",
+      "5,5,2,{2=1, 3=1}")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testProcTimeDistinctPairWithNulls(): Unit = {
+
+    val data = List(
+      ("A", null),
+      ("A", null),
+      ("B", null),
+      (null, "Hello"),
+      ("A", "Hello"),
+      ("A", "Hello"),
+      (null, "Hello World"),
+      (null, "Hello World"),
+      ("A", "Hello World"),
+      ("B", "Hello World"))
+
+    env.setParallelism(1)
+
+    val table = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'proctime)
+    tEnv.registerTable("MyTable", table)
+    tEnv.registerFunction("PairCount", new CountPairs)
+
+    val sqlQuery = "SELECT a, b, " +
+      "  PairCount(a, b) OVER (ORDER BY proctime RANGE UNBOUNDED preceding), " +
+      "  PairCount(DISTINCT a, b) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) " +
+      "FROM MyTable"
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "A,null,1,1",
+      "A,null,2,1",
+      "B,null,3,2",
+      "null,Hello,4,3",
+      "A,Hello,5,4",
+      "A,Hello,6,4",
+      "null,Hello World,7,5",
+      "null,Hello World,8,5",
+      "A,Hello World,9,6",
+      "B,Hello World,10,7")
+    assertEquals(expected, sink.getAppendResults)
+  }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/KeyedProcessFunctionWithCleanupState.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/KeyedProcessFunctionWithCleanupState.java
index a761c5f..5c8217c 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/KeyedProcessFunctionWithCleanupState.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/KeyedProcessFunctionWithCleanupState.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 
+import java.io.IOException;
+
 /**
  * A function that processes elements of a stream, and could cleanup state.
  * @param <K> Type of the key.
@@ -79,4 +81,13 @@ public abstract class KeyedProcessFunctionWithCleanupState<K, IN, OUT>
 		this.cleanupTimeState.clear();
 	}
 
+	protected Boolean needToCleanupState(Long timestamp) throws IOException {
+		if (stateCleaningEnabled) {
+			Long cleanupTime = cleanupTimeState.value();
+			// check that the triggered timer is the last registered processing time timer.
+			return null != cleanupTime && timestamp == cleanupTime;
+		} else {
+			return false;
+		}
+	}
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/AbstractRowTimeUnboundedPrecedingOver.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/AbstractRowTimeUnboundedPrecedingOver.java
new file mode 100644
index 0000000..0362976
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/AbstractRowTimeUnboundedPrecedingOver.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.over;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.dataview.PerKeyStateDataViewStore;
+import org.apache.flink.table.generated.AggsHandleFunction;
+import org.apache.flink.table.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.Collector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+
+/**
+ * A basic implementation to support unbounded event-time over-window.
+ */
+public abstract class AbstractRowTimeUnboundedPrecedingOver<K> extends KeyedProcessFunctionWithCleanupState<K, BaseRow, BaseRow> {
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(AbstractRowTimeUnboundedPrecedingOver.class);
+
+	private final GeneratedAggsHandleFunction genAggsHandler;
+	private final InternalType[] accTypes;
+	private final InternalType[] inputFieldTypes;
+	private final int rowTimeIdx;
+
+	protected transient JoinedRow output;
+	// state to hold the accumulators of the aggregations
+	private transient ValueState<BaseRow> accState;
+	// state to hold rows until the next watermark arrives
+	private transient MapState<Long, List<BaseRow>> inputState;
+	// list to sort timestamps to access rows in timestamp order
+	private transient LinkedList<Long> sortedTimestamps;
+
+	protected transient AggsHandleFunction function;
+
+	public AbstractRowTimeUnboundedPrecedingOver(
+			long minRetentionTime,
+			long maxRetentionTime,
+			GeneratedAggsHandleFunction genAggsHandler,
+			InternalType[] accTypes,
+			InternalType[] inputFieldTypes,
+			int rowTimeIdx) {
+		super(minRetentionTime, maxRetentionTime);
+		this.genAggsHandler = genAggsHandler;
+		this.accTypes = accTypes;
+		this.inputFieldTypes = inputFieldTypes;
+		this.rowTimeIdx = rowTimeIdx;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		function = genAggsHandler.newInstance(getRuntimeContext().getUserCodeClassLoader());
+		function.open(new PerKeyStateDataViewStore(getRuntimeContext()));
+
+		output = new JoinedRow();
+
+		sortedTimestamps = new LinkedList<Long>();
+
+		// initialize accumulator state
+		BaseRowTypeInfo accTypeInfo = new BaseRowTypeInfo(accTypes);
+		ValueStateDescriptor<BaseRow> accStateDesc =
+			new ValueStateDescriptor<BaseRow>("accState", accTypeInfo);
+		accState = getRuntimeContext().getState(accStateDesc);
+
+		// input element are all binary row as they are came from network
+		BaseRowTypeInfo inputType = new BaseRowTypeInfo(inputFieldTypes);
+		ListTypeInfo<BaseRow> rowListTypeInfo = new ListTypeInfo<BaseRow>(inputType);
+		MapStateDescriptor<Long, List<BaseRow>> inputStateDesc = new MapStateDescriptor<Long, List<BaseRow>>(
+			"inputState",
+			Types.LONG,
+			rowListTypeInfo);
+		inputState = getRuntimeContext().getMapState(inputStateDesc);
+
+		initCleanupTimeState("RowTimeUnboundedOverCleanupTime");
+	}
+
+	/**
+	 * Puts an element from the input stream into state if it is not late.
+	 * Registers a timer for the next watermark.
+	 *
+	 * @param input The input value.
+	 * @param ctx   A {@link Context} that allows querying the timestamp of the element and getting
+	 *              TimerService for registering timers and querying the time. The
+	 *              context is only valid during the invocation of this method, do not store it.
+	 * @param out   The collector for returning result values.
+	 * @throws Exception
+	 */
+	@Override
+	public void processElement(
+			BaseRow input,
+			KeyedProcessFunction<K, BaseRow, BaseRow>.Context ctx,
+			Collector<BaseRow> out) throws Exception {
+		// register state-cleanup timer
+		registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime());
+
+		long timestamp = input.getLong(rowTimeIdx);
+		long curWatermark = ctx.timerService().currentWatermark();
+
+		// discard late record
+		if (timestamp > curWatermark) {
+			// ensure every key just registers one timer
+			// default watermark is Long.Min, avoid overflow we use zero when watermark < 0
+			long triggerTs = curWatermark < 0 ? 0 : curWatermark + 1;
+			ctx.timerService().registerEventTimeTimer(triggerTs);
+
+			// put row into state
+			List<BaseRow> rowList = inputState.get(timestamp);
+			if (rowList == null) {
+				rowList = new ArrayList<BaseRow>();
+			}
+			rowList.add(input);
+			inputState.put(timestamp, rowList);
+		}
+	}
+
+	@Override
+	public void onTimer(
+			long timestamp,
+			KeyedProcessFunction<K, BaseRow, BaseRow>.OnTimerContext ctx,
+			Collector<BaseRow> out) throws Exception {
+		if (isProcessingTimeTimer(ctx)) {
+			if (needToCleanupState(timestamp)) {
+
+				// we check whether there are still records which have not been processed yet
+				boolean noRecordsToProcess = !inputState.contains(timestamp);
+				if (noRecordsToProcess) {
+					// we clean the state
+					cleanupState(inputState, accState);
+					function.cleanup();
+				} else {
+					// There are records left to process because a watermark has not been received yet.
+					// This would only happen if the input stream has stopped. So we don't need to clean up.
+					// We leave the state as it is and schedule a new cleanup timer
+					registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime());
+				}
+			}
+			return;
+		}
+
+		Iterator<Long> keyIterator = inputState.keys().iterator();
+		if (keyIterator.hasNext()) {
+			Long curWatermark = ctx.timerService().currentWatermark();
+			boolean existEarlyRecord = false;
+
+			// sort the record timestamps
+			do {
+				Long recordTime = keyIterator.next();
+				// only take timestamps smaller/equal to the watermark
+				if (recordTime <= curWatermark) {
+					insertToSortedList(recordTime);
+				} else {
+					existEarlyRecord = true;
+				}
+			} while (keyIterator.hasNext());
+
+			// get last accumulator
+			BaseRow lastAccumulator = accState.value();
+			if (lastAccumulator == null) {
+				// initialize accumulator
+				lastAccumulator = function.createAccumulators();
+			}
+			// set accumulator in function context first
+			function.setAccumulators(lastAccumulator);
+
+			// emit the rows in order
+			while (!sortedTimestamps.isEmpty()) {
+				Long curTimestamp = sortedTimestamps.removeFirst();
+				List<BaseRow> curRowList = inputState.get(curTimestamp);
+				if (curRowList != null) {
+					// process the same timestamp datas, the mechanism is different according ROWS or RANGE
+					processElementsWithSameTimestamp(curRowList, out);
+				} else {
+					// Ignore the same timestamp datas if the state is cleared already.
+					LOG.warn("The state is cleared because of state ttl. " +
+						"This will result in incorrect result. " +
+						"You can increase the state ttl to avoid this.");
+				}
+				inputState.remove(curTimestamp);
+			}
+
+			// update acc state
+			lastAccumulator = function.getAccumulators();
+			accState.update(lastAccumulator);
+
+			// if are are rows with timestamp > watermark, register a timer for the next watermark
+			if (existEarlyRecord) {
+				ctx.timerService().registerEventTimeTimer(curWatermark + 1);
+			}
+		}
+
+		// update cleanup timer
+		registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime());
+	}
+
+	/**
+	 * Inserts timestamps in order into a linked list.
+	 * If timestamps arrive in order (as in case of using the RocksDB state backend) this is just
+	 * an append with O(1).
+	 */
+	private void insertToSortedList(Long recordTimestamp) {
+		ListIterator<Long> listIterator = sortedTimestamps.listIterator(sortedTimestamps.size());
+		boolean isContinue = true;
+		while (listIterator.hasPrevious() && isContinue) {
+			Long timestamp = listIterator.previous();
+			if (recordTimestamp >= timestamp) {
+				listIterator.next();
+				listIterator.add(recordTimestamp);
+				isContinue = false;
+			}
+		}
+
+		if (isContinue) {
+			sortedTimestamps.addFirst(recordTimestamp);
+		}
+	}
+
+	/**
+	 * Process the same timestamp datas, the mechanism is different between
+	 * rows and range window.
+	 */
+	protected abstract void processElementsWithSameTimestamp(
+		List<BaseRow> curRowList,
+		Collector<BaseRow> out) throws Exception;
+
+	@Override
+	public void close() throws Exception {
+		if (null != function) {
+			function.close();
+		}
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeRangeBoundedPrecedingFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeRangeBoundedPrecedingFunction.java
new file mode 100644
index 0000000..de53101
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeRangeBoundedPrecedingFunction.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.over;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.dataview.PerKeyStateDataViewStore;
+import org.apache.flink.table.generated.AggsHandleFunction;
+import org.apache.flink.table.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.Collector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Process Function used for the aggregate in bounded proc-time OVER window.
+ *
+ * <p>E.g.:
+ * SELECT currtime, b, c,
+ * min(c) OVER
+ * (PARTITION BY b ORDER BY proctime
+ * RANGE BETWEEN INTERVAL '4' SECOND PRECEDING AND CURRENT ROW),
+ * max(c) OVER
+ * (PARTITION BY b ORDER BY proctime
+ * RANGE BETWEEN INTERVAL '4' SECOND PRECEDING AND CURRENT ROW)
+ * FROM T.
+ */
+public class ProcTimeRangeBoundedPrecedingFunction<K> extends KeyedProcessFunctionWithCleanupState<K, BaseRow, BaseRow> {
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(ProcTimeRangeBoundedPrecedingFunction.class);
+
+	private final GeneratedAggsHandleFunction genAggsHandler;
+	private final InternalType[] accTypes;
+	private final InternalType[] inputFieldTypes;
+	private final long precedingTimeBoundary;
+
+	private transient ValueState<BaseRow> accState;
+	private transient MapState<Long, List<BaseRow>> inputState;
+
+	private transient AggsHandleFunction function;
+	private transient JoinedRow output;
+
+	public ProcTimeRangeBoundedPrecedingFunction(
+			long minRetentionTime,
+			long maxRetentionTime,
+			GeneratedAggsHandleFunction genAggsHandler,
+			InternalType[] accTypes,
+			InternalType[] inputFieldTypes,
+			long precedingTimeBoundary) {
+		super(minRetentionTime, maxRetentionTime);
+		this.genAggsHandler = genAggsHandler;
+		this.accTypes = accTypes;
+		this.inputFieldTypes = inputFieldTypes;
+		this.precedingTimeBoundary = precedingTimeBoundary;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		function = genAggsHandler.newInstance(getRuntimeContext().getUserCodeClassLoader());
+		function.open(new PerKeyStateDataViewStore(getRuntimeContext()));
+
+		output = new JoinedRow();
+
+		// input element are all binary row as they are came from network
+		BaseRowTypeInfo inputType = new BaseRowTypeInfo(inputFieldTypes);
+		// we keep the elements received in a map state indexed based on their ingestion time
+		ListTypeInfo<BaseRow> rowListTypeInfo = new ListTypeInfo<BaseRow>(inputType);
+		MapStateDescriptor<Long, List<BaseRow>> mapStateDescriptor = new MapStateDescriptor<Long, List<BaseRow>>(
+			"inputState", BasicTypeInfo.LONG_TYPE_INFO, rowListTypeInfo);
+		inputState = getRuntimeContext().getMapState(mapStateDescriptor);
+
+		BaseRowTypeInfo accTypeInfo = new BaseRowTypeInfo(accTypes);
+		ValueStateDescriptor<BaseRow> stateDescriptor =
+			new ValueStateDescriptor<BaseRow>("accState", accTypeInfo);
+		accState = getRuntimeContext().getState(stateDescriptor);
+
+		initCleanupTimeState("ProcTimeBoundedRangeOverCleanupTime");
+	}
+
+	@Override
+	public void processElement(
+			BaseRow input,
+			KeyedProcessFunction<K, BaseRow, BaseRow>.Context ctx,
+			Collector<BaseRow> out) throws Exception {
+		long currentTime = ctx.timerService().currentProcessingTime();
+		// register state-cleanup timer
+		registerProcessingCleanupTimer(ctx, currentTime);
+
+		// buffer the event incoming event
+
+		// add current element to the window list of elements with corresponding timestamp
+		List<BaseRow> rowList = inputState.get(currentTime);
+		// null value means that this si the first event received for this timestamp
+		if (rowList == null) {
+			rowList = new ArrayList<BaseRow>();
+			// register timer to process event once the current millisecond passed
+			ctx.timerService().registerProcessingTimeTimer(currentTime + 1);
+		}
+		rowList.add(input);
+		inputState.put(currentTime, rowList);
+	}
+
+	@Override
+	public void onTimer(
+			long timestamp,
+			KeyedProcessFunction<K, BaseRow, BaseRow>.OnTimerContext ctx,
+			Collector<BaseRow> out) throws Exception {
+		if (needToCleanupState(timestamp)) {
+			// clean up and return
+			cleanupState(inputState, accState);
+			function.cleanup();
+			return;
+		}
+
+		// remove timestamp set outside of ProcessFunction.
+		((TimestampedCollector) out).eraseTimestamp();
+
+		// we consider the original timestamp of events
+		// that have registered this time trigger 1 ms ago
+
+		long currentTime = timestamp - 1;
+
+		// get the list of elements of current proctime
+		List<BaseRow> currentElements = inputState.get(currentTime);
+
+		// Expired clean-up timers pass the needToCleanupState check.
+		// Perform a null check to verify that we have data to process.
+		if (null == currentElements) {
+			return;
+		}
+
+		// initialize the accumulators
+		BaseRow accumulators = accState.value();
+
+		if (null == accumulators) {
+			accumulators = function.createAccumulators();
+		}
+
+		// set accumulators in context first
+		function.setAccumulators(accumulators);
+
+		// update the elements to be removed and retract them from aggregators
+		long limit = currentTime - precedingTimeBoundary;
+
+		// we iterate through all elements in the window buffer based on timestamp keys
+		// when we find timestamps that are out of interest, we retrieve corresponding elements
+		// and eliminate them. Multiple elements could have been received at the same timestamp
+		// the removal of old elements happens only once per proctime as onTimer is called only once
+		Iterator<Long> iter = inputState.keys().iterator();
+		List<Long> markToRemove = new ArrayList<Long>();
+		while (iter.hasNext()) {
+			Long elementKey = iter.next();
+			if (elementKey < limit) {
+				// element key outside of window. Retract values
+				List<BaseRow> elementsRemove = inputState.get(elementKey);
+				if (elementsRemove != null) {
+					int iRemove = 0;
+					while (iRemove < elementsRemove.size()) {
+						BaseRow retractRow = elementsRemove.get(iRemove);
+						function.retract(retractRow);
+						iRemove += 1;
+					}
+				} else {
+					// Does not retract values which are outside of window if the state is cleared already.
+					LOG.warn("The state is cleared because of state ttl. " +
+						"This will result in incorrect result. " +
+						"You can increase the state ttl to avoid this.");
+				}
+
+				// mark element for later removal not to modify the iterator over MapState
+				markToRemove.add(elementKey);
+			}
+		}
+
+		// need to remove in 2 steps not to have concurrent access errors via iterator to the MapState
+		int i = 0;
+		while (i < markToRemove.size()) {
+			inputState.remove(markToRemove.get(i));
+			i += 1;
+		}
+
+		// add current elements to aggregator. Multiple elements might
+		// have arrived in the same proctime
+		// the same accumulator value will be computed for all elements
+		int iElemenets = 0;
+		while (iElemenets < currentElements.size()) {
+			BaseRow input = currentElements.get(iElemenets);
+			function.accumulate(input);
+			iElemenets += 1;
+		}
+
+		// we need to build the output and emit for every event received at this proctime
+		iElemenets = 0;
+		BaseRow aggValue = function.getValue();
+		while (iElemenets < currentElements.size()) {
+			BaseRow input = currentElements.get(iElemenets);
+			output.replace(input, aggValue);
+			out.collect(output);
+			iElemenets += 1;
+		}
+
+		// update the value of accumulators for future incremental computation
+		accumulators = function.getAccumulators();
+		accState.update(accumulators);
+	}
+
+	@Override
+	public void close() throws Exception {
+		if (null != function) {
+			function.close();
+		}
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeRowsBoundedPrecedingFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeRowsBoundedPrecedingFunction.java
new file mode 100644
index 0000000..fb25e1e
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeRowsBoundedPrecedingFunction.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.over;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.dataview.PerKeyStateDataViewStore;
+import org.apache.flink.table.generated.AggsHandleFunction;
+import org.apache.flink.table.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Process Function for ROW clause processing-time bounded OVER window.
+ *
+ * <p>E.g.:
+ * SELECT currtime, b, c,
+ * min(c) OVER
+ * (PARTITION BY b ORDER BY proctime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW),
+ * max(c) OVER
+ * (PARTITION BY b ORDER BY proctime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)
+ * FROM T.
+ */
+public class ProcTimeRowsBoundedPrecedingFunction<K> extends KeyedProcessFunctionWithCleanupState<K, BaseRow, BaseRow> {
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(ProcTimeRowsBoundedPrecedingFunction.class);
+
+	private final GeneratedAggsHandleFunction genAggsHandler;
+	private final InternalType[] accTypes;
+	private final InternalType[] inputFieldTypes;
+	private final long precedingOffset;
+
+	private transient AggsHandleFunction function;
+
+	private transient ValueState<BaseRow> accState;
+	private transient MapState<Long, List<BaseRow>> inputState;
+	private transient ValueState<Long> counterState;
+	private transient ValueState<Long> smallestTsState;
+
+	private transient JoinedRow output;
+
+	public ProcTimeRowsBoundedPrecedingFunction(
+			long minRetentionTime,
+			long maxRetentionTime,
+			GeneratedAggsHandleFunction genAggsHandler,
+			InternalType[] accTypes,
+			InternalType[] inputFieldTypes,
+			long precedingOffset) {
+		super(minRetentionTime, maxRetentionTime);
+		Preconditions.checkArgument(precedingOffset > 0);
+		this.genAggsHandler = genAggsHandler;
+		this.accTypes = accTypes;
+		this.inputFieldTypes = inputFieldTypes;
+		this.precedingOffset = precedingOffset;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		function = genAggsHandler.newInstance(getRuntimeContext().getUserCodeClassLoader());
+		function.open(new PerKeyStateDataViewStore(getRuntimeContext()));
+
+		output = new JoinedRow();
+
+		// input element are all binary row as they are came from network
+		BaseRowTypeInfo inputType = new BaseRowTypeInfo(inputFieldTypes);
+		// We keep the elements received in a Map state keyed
+		// by the ingestion time in the operator.
+		// we also keep counter of processed elements
+		// and timestamp of oldest element
+		ListTypeInfo<BaseRow> rowListTypeInfo = new ListTypeInfo<BaseRow>(inputType);
+		MapStateDescriptor<Long, List<BaseRow>> mapStateDescriptor = new MapStateDescriptor<Long, List<BaseRow>>(
+			"inputState", BasicTypeInfo.LONG_TYPE_INFO, rowListTypeInfo);
+		inputState = getRuntimeContext().getMapState(mapStateDescriptor);
+
+		BaseRowTypeInfo accTypeInfo = new BaseRowTypeInfo(accTypes);
+		ValueStateDescriptor<BaseRow> stateDescriptor =
+			new ValueStateDescriptor<BaseRow>("accState", accTypeInfo);
+		accState = getRuntimeContext().getState(stateDescriptor);
+
+		ValueStateDescriptor<Long> processedCountDescriptor = new ValueStateDescriptor<Long>(
+			"processedCountState",
+			Types.LONG);
+		counterState = getRuntimeContext().getState(processedCountDescriptor);
+
+		ValueStateDescriptor<Long> smallestTimestampDescriptor = new ValueStateDescriptor<Long>(
+			"smallestTSState",
+			Types.LONG);
+		smallestTsState = getRuntimeContext().getState(smallestTimestampDescriptor);
+
+		initCleanupTimeState("ProcTimeBoundedRowsOverCleanupTime");
+	}
+
+	@Override
+	public void processElement(
+			BaseRow input,
+			KeyedProcessFunction<K, BaseRow, BaseRow>.Context ctx,
+			Collector<BaseRow> out) throws Exception {
+		long currentTime = ctx.timerService().currentProcessingTime();
+		// register state-cleanup timer
+		registerProcessingCleanupTimer(ctx, currentTime);
+
+		// initialize state for the processed element
+		BaseRow accumulators = accState.value();
+		if (accumulators == null) {
+			accumulators = function.createAccumulators();
+		}
+		// set accumulators in context first
+		function.setAccumulators(accumulators);
+
+		// get smallest timestamp
+		Long smallestTs = smallestTsState.value();
+		if (smallestTs == null) {
+			smallestTs = currentTime;
+			smallestTsState.update(smallestTs);
+		}
+		// get previous counter value
+		Long counter = counterState.value();
+		if (counter == null) {
+			counter = 0L;
+		}
+
+		if (counter == precedingOffset) {
+			List<BaseRow> retractList = inputState.get(smallestTs);
+			if (retractList != null) {
+				// get oldest element beyond buffer size
+				// and if oldest element exist, retract value
+				BaseRow retractRow = retractList.get(0);
+				function.retract(retractRow);
+				retractList.remove(0);
+			} else {
+				// Does not retract values which are outside of window if the state is cleared already.
+				LOG.warn("The state is cleared because of state ttl. " +
+					"This will result in incorrect result. " +
+					"You can increase the state ttl to avoid this.");
+			}
+			// if reference timestamp list not empty, keep the list
+			if (retractList != null && !retractList.isEmpty()) {
+				inputState.put(smallestTs, retractList);
+			} // if smallest timestamp list is empty, remove and find new smallest
+			else {
+				inputState.remove(smallestTs);
+				Iterator<Long> iter = inputState.keys().iterator();
+				long currentTs = 0L;
+				long newSmallestTs = Long.MAX_VALUE;
+				while (iter.hasNext()) {
+					currentTs = iter.next();
+					if (currentTs < newSmallestTs) {
+						newSmallestTs = currentTs;
+					}
+				}
+				smallestTsState.update(newSmallestTs);
+			}
+		} // we update the counter only while buffer is getting filled
+		else {
+			counter += 1;
+			counterState.update(counter);
+		}
+
+		// update map state, counter and timestamp
+		List<BaseRow> currentTimeState = inputState.get(currentTime);
+		if (currentTimeState != null) {
+			currentTimeState.add(input);
+			inputState.put(currentTime, currentTimeState);
+		} else { // add new input
+			List<BaseRow> newList = new ArrayList<BaseRow>();
+			newList.add(input);
+			inputState.put(currentTime, newList);
+		}
+
+		// accumulate current row
+		function.accumulate(input);
+		// update the value of accumulators for future incremental computation
+		accumulators = function.getAccumulators();
+		accState.update(accumulators);
+
+		// prepare output row
+		BaseRow aggValue = function.getValue();
+		output.replace(input, aggValue);
+		out.collect(output);
+	}
+
+	@Override
+	public void onTimer(
+			long timestamp,
+			KeyedProcessFunction<K, BaseRow, BaseRow>.OnTimerContext ctx,
+			Collector<BaseRow> out) throws Exception {
+		if (needToCleanupState(timestamp)) {
+			cleanupState(inputState, accState, counterState, smallestTsState);
+			function.cleanup();
+		}
+	}
+
+	@Override
+	public void close() throws Exception {
+		if (null != function) {
+			function.close();
+		}
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeUnboundedPrecedingFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeUnboundedPrecedingFunction.java
new file mode 100644
index 0000000..50b0b78
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeUnboundedPrecedingFunction.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.over;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.dataview.PerKeyStateDataViewStore;
+import org.apache.flink.table.generated.AggsHandleFunction;
+import org.apache.flink.table.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.Collector;
+
+/**
+ * Process Function for processing-time unbounded OVER window.
+ *
+ * <p>E.g.:
+ * SELECT currtime, b, c,
+ * min(c) OVER
+ * (PARTITION BY b ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW),
+ * max(c) OVER
+ * (PARTITION BY b ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)
+ * FROM T.
+ */
+public class ProcTimeUnboundedPrecedingFunction<K> extends KeyedProcessFunctionWithCleanupState<K, BaseRow, BaseRow> {
+	private static final long serialVersionUID = 1L;
+
+	private final GeneratedAggsHandleFunction genAggsHandler;
+	private final InternalType[] accTypes;
+
+	private transient AggsHandleFunction function;
+	private transient ValueState<BaseRow> accState;
+	private transient JoinedRow output;
+
+	public ProcTimeUnboundedPrecedingFunction(
+			long minRetentionTime,
+			long maxRetentionTime,
+			GeneratedAggsHandleFunction genAggsHandler,
+			InternalType[] accTypes) {
+		super(minRetentionTime, maxRetentionTime);
+		this.genAggsHandler = genAggsHandler;
+		this.accTypes = accTypes;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		function = genAggsHandler.newInstance(getRuntimeContext().getUserCodeClassLoader());
+		function.open(new PerKeyStateDataViewStore(getRuntimeContext()));
+
+		output = new JoinedRow();
+
+		BaseRowTypeInfo accTypeInfo = new BaseRowTypeInfo(accTypes);
+		ValueStateDescriptor<BaseRow> stateDescriptor =
+			new ValueStateDescriptor<BaseRow>("accState", accTypeInfo);
+		accState = getRuntimeContext().getState(stateDescriptor);
+
+		initCleanupTimeState("ProcTimeUnboundedOverCleanupTime");
+	}
+
+	@Override
+	public void processElement(
+			BaseRow input,
+			KeyedProcessFunction<K, BaseRow, BaseRow>.Context ctx,
+			Collector<BaseRow> out) throws Exception {
+		// register state-cleanup timer
+		registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime());
+
+		BaseRow accumulators = accState.value();
+		if (null == accumulators) {
+			accumulators = function.createAccumulators();
+		}
+		// set accumulators in context first
+		function.setAccumulators(accumulators);
+
+		// accumulate input row
+		function.accumulate(input);
+
+		// update the value of accumulators for future incremental computation
+		accumulators = function.getAccumulators();
+		accState.update(accumulators);
+
+		// prepare output row
+		BaseRow aggValue = function.getValue();
+		output.replace(input, aggValue);
+		out.collect(output);
+	}
+
+	@Override
+	public void onTimer(
+			long timestamp,
+			KeyedProcessFunction<K, BaseRow, BaseRow>.OnTimerContext ctx,
+			Collector<BaseRow> out) throws Exception {
+		if (needToCleanupState(timestamp)) {
+			cleanupState(accState);
+			function.cleanup();
+		}
+	}
+
+	@Override
+	public void close() throws Exception {
+		if (null != function) {
+			function.close();
+		}
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRangeBoundedPrecedingFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRangeBoundedPrecedingFunction.java
new file mode 100644
index 0000000..8133d2f
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRangeBoundedPrecedingFunction.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.over;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.dataview.PerKeyStateDataViewStore;
+import org.apache.flink.table.generated.AggsHandleFunction;
+import org.apache.flink.table.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Process Function for RANGE clause event-time bounded OVER window.
+ *
+ * <p>E.g.:
+ * SELECT rowtime, b, c,
+ * min(c) OVER
+ * (PARTITION BY b ORDER BY rowtime
+ * RANGE BETWEEN INTERVAL '4' SECOND PRECEDING AND CURRENT ROW),
+ * max(c) OVER
+ * (PARTITION BY b ORDER BY rowtime
+ * RANGE BETWEEN INTERVAL '4' SECOND PRECEDING AND CURRENT ROW)
+ * FROM T.
+ */
+public class RowTimeRangeBoundedPrecedingFunction<K> extends KeyedProcessFunctionWithCleanupState<K, BaseRow, BaseRow> {
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(RowTimeRowsBoundedPrecedingFunction.class);
+
+	private final GeneratedAggsHandleFunction genAggsHandler;
+	private final InternalType[] accTypes;
+	private final InternalType[] inputFieldTypes;
+	private final long precedingOffset;
+	private final int rowTimeIdx;
+
+	private transient JoinedRow output;
+
+	// the state which keeps the last triggering timestamp
+	private transient ValueState<Long> lastTriggeringTsState;
+
+	// the state which used to materialize the accumulator for incremental calculation
+	private transient ValueState<BaseRow> accState;
+
+	// the state which keeps all the data that are not expired.
+	// The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
+	// the second element of tuple is a list that contains the entire data of all the rows belonging
+	// to this time stamp.
+	private transient MapState<Long, List<BaseRow>> inputState;
+
+	private transient AggsHandleFunction function;
+
+	public RowTimeRangeBoundedPrecedingFunction(
+			long minRetentionTime,
+			long maxRetentionTime,
+			GeneratedAggsHandleFunction genAggsHandler,
+			InternalType[] accTypes,
+			InternalType[] inputFieldTypes,
+			long precedingOffset,
+			int rowTimeIdx) {
+		super(minRetentionTime, maxRetentionTime);
+		Preconditions.checkNotNull(precedingOffset);
+		this.genAggsHandler = genAggsHandler;
+		this.accTypes = accTypes;
+		this.inputFieldTypes = inputFieldTypes;
+		this.precedingOffset = precedingOffset;
+		this.rowTimeIdx = rowTimeIdx;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		function = genAggsHandler.newInstance(getRuntimeContext().getUserCodeClassLoader());
+		function.open(new PerKeyStateDataViewStore(getRuntimeContext()));
+
+		output = new JoinedRow();
+
+		ValueStateDescriptor<Long> lastTriggeringTsDescriptor = new ValueStateDescriptor<Long>(
+			"lastTriggeringTsState",
+			Types.LONG);
+		lastTriggeringTsState = getRuntimeContext().getState(lastTriggeringTsDescriptor);
+
+		BaseRowTypeInfo accTypeInfo = new BaseRowTypeInfo(accTypes);
+		ValueStateDescriptor<BaseRow> accStateDesc = new ValueStateDescriptor<BaseRow>("accState", accTypeInfo);
+		accState = getRuntimeContext().getState(accStateDesc);
+
+		// input element are all binary row as they are came from network
+		BaseRowTypeInfo inputType = new BaseRowTypeInfo(inputFieldTypes);
+		ListTypeInfo<BaseRow> rowListTypeInfo = new ListTypeInfo<BaseRow>(inputType);
+		MapStateDescriptor<Long, List<BaseRow>> inputStateDesc = new MapStateDescriptor<Long, List<BaseRow>>(
+			"inputState",
+			Types.LONG,
+			rowListTypeInfo);
+		inputState = getRuntimeContext().getMapState(inputStateDesc);
+
+		initCleanupTimeState("RowTimeBoundedRangeOverCleanupTime");
+	}
+
+	@Override
+	public void processElement(
+			BaseRow input,
+			KeyedProcessFunction<K, BaseRow, BaseRow>.Context ctx,
+			Collector<BaseRow> out) throws Exception {
+		// register state-cleanup timer
+		registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime());
+
+		// triggering timestamp for trigger calculation
+		long triggeringTs = input.getLong(rowTimeIdx);
+
+		Long lastTriggeringTs = lastTriggeringTsState.value();
+		if (lastTriggeringTs == null) {
+			lastTriggeringTs = 0L;
+		}
+
+		// check if the data is expired, if not, save the data and register event time timer
+		if (triggeringTs > lastTriggeringTs) {
+			List<BaseRow> data = inputState.get(triggeringTs);
+			if (null != data) {
+				data.add(input);
+				inputState.put(triggeringTs, data);
+			} else {
+				data = new ArrayList<BaseRow>();
+				data.add(input);
+				inputState.put(triggeringTs, data);
+				// register event time timer
+				ctx.timerService().registerEventTimeTimer(triggeringTs);
+			}
+		}
+	}
+
+	@Override
+	public void onTimer(
+			long timestamp,
+			KeyedProcessFunction<K, BaseRow, BaseRow>.OnTimerContext ctx,
+			Collector<BaseRow> out) throws Exception {
+		// register state-cleanup timer
+		registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime());
+
+		if (isProcessingTimeTimer(ctx)) {
+			if (needToCleanupState(timestamp)) {
+
+				Iterator<Long> keysIt = inputState.keys().iterator();
+				Long lastProcessedTime = lastTriggeringTsState.value();
+				if (lastProcessedTime == null) {
+					lastProcessedTime = 0L;
+				}
+
+				// is data left which has not been processed yet?
+				boolean noRecordsToProcess = true;
+				while (keysIt.hasNext() && noRecordsToProcess) {
+					if (keysIt.next() > lastProcessedTime) {
+						noRecordsToProcess = false;
+					}
+				}
+
+				if (noRecordsToProcess) {
+					// we clean the state
+					cleanupState(inputState, accState, lastTriggeringTsState);
+					function.cleanup();
+				} else {
+					// There are records left to process because a watermark has not been received yet.
+					// This would only happen if the input stream has stopped. So we don't need to clean up.
+					// We leave the state as it is and schedule a new cleanup timer
+					registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime());
+				}
+			}
+			return;
+		}
+
+		// gets all window data from state for the calculation
+		List<BaseRow> inputs = inputState.get(timestamp);
+
+		if (null != inputs) {
+
+			int dataListIndex = 0;
+			BaseRow accumulators = accState.value();
+
+			// initialize when first run or failover recovery per key
+			if (null == accumulators) {
+				accumulators = function.createAccumulators();
+			}
+			// set accumulators in context first
+			function.setAccumulators(accumulators);
+
+			// keep up timestamps of retract data
+			List<Long> retractTsList = new ArrayList<Long>();
+
+			// do retraction
+			Iterator<Long> dataTimestampIt = inputState.keys().iterator();
+			while (dataTimestampIt.hasNext()) {
+				Long dataTs = dataTimestampIt.next();
+				Long offset = timestamp - dataTs;
+				if (offset > precedingOffset) {
+					List<BaseRow> retractDataList = inputState.get(dataTs);
+					if (retractDataList != null) {
+						dataListIndex = 0;
+						while (dataListIndex < retractDataList.size()) {
+							BaseRow retractRow = retractDataList.get(dataListIndex);
+							function.retract(retractRow);
+							dataListIndex += 1;
+						}
+						retractTsList.add(dataTs);
+					} else {
+						// Does not retract values which are outside of window if the state is cleared already.
+						LOG.warn("The state is cleared because of state ttl. " +
+							"This will result in incorrect result. " +
+							"You can increase the state ttl to avoid this.");
+					}
+				}
+			}
+
+			// do accumulation
+			dataListIndex = 0;
+			while (dataListIndex < inputs.size()) {
+				BaseRow curRow = inputs.get(dataListIndex);
+				// accumulate current row
+				function.accumulate(curRow);
+				dataListIndex += 1;
+			}
+
+			// get aggregate result
+			BaseRow aggValue = function.getValue();
+
+			// copy forwarded fields to output row and emit output row
+			dataListIndex = 0;
+			while (dataListIndex < inputs.size()) {
+				BaseRow curRow = inputs.get(dataListIndex);
+				output.replace(curRow, aggValue);
+				out.collect(output);
+				dataListIndex += 1;
+			}
+
+			// remove the data that has been retracted
+			dataListIndex = 0;
+			while (dataListIndex < retractTsList.size()) {
+				inputState.remove(retractTsList.get(dataListIndex));
+				dataListIndex += 1;
+			}
+
+			// update the value of accumulators for future incremental computation
+			accumulators = function.getAccumulators();
+			accState.update(accumulators);
+		}
+		lastTriggeringTsState.update(timestamp);
+
+		// update cleanup timer
+		registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime());
+	}
+
+	@Override
+	public void close() throws Exception {
+		if (null != function) {
+			function.close();
+		}
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRangeUnboundedPrecedingFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRangeUnboundedPrecedingFunction.java
new file mode 100644
index 0000000..450dd56
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRangeUnboundedPrecedingFunction.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.over;
+
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+
+/**
+ * A ProcessFunction to support unbounded RANGE window.
+ * The RANGE option includes all the rows within the window frame
+ * that have the same ORDER BY values as the current row.
+ *
+ * <p>E.g.:
+ * SELECT rowtime, b, c,
+ * min(c) OVER
+ * (PARTITION BY b ORDER BY rowtime
+ * RANGE BETWEEN UNBOUNDED preceding AND CURRENT ROW),
+ * max(c) OVER
+ * (PARTITION BY b ORDER BY rowtime
+ * RANGE BETWEEN UNBOUNDED preceding AND CURRENT ROW)
+ * FROM T.
+ */
+public class RowTimeRangeUnboundedPrecedingFunction<K> extends AbstractRowTimeUnboundedPrecedingOver<K> {
+	private static final long serialVersionUID = 1L;
+
+	public RowTimeRangeUnboundedPrecedingFunction(
+			long minRetentionTime,
+			long maxRetentionTime,
+			GeneratedAggsHandleFunction genAggsHandler,
+			InternalType[] accTypes,
+			InternalType[] inputFieldTypes,
+			int rowTimeIdx) {
+		super(minRetentionTime, maxRetentionTime, genAggsHandler, accTypes, inputFieldTypes, rowTimeIdx);
+	}
+
+	@Override
+	public void processElementsWithSameTimestamp(
+			List<BaseRow> curRowList,
+			Collector<BaseRow> out) throws Exception {
+		int i = 0;
+		// all same timestamp data should have same aggregation value.
+		while (i < curRowList.size()) {
+			BaseRow curRow = curRowList.get(i);
+			function.accumulate(curRow);
+			i += 1;
+		}
+
+		// emit output row
+		i = 0;
+		BaseRow aggValue = function.getValue();
+		while (i < curRowList.size()) {
+			BaseRow curRow = curRowList.get(i);
+			// prepare output row
+			output.replace(curRow, aggValue);
+			out.collect(output);
+			i += 1;
+		}
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRowsBoundedPrecedingFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRowsBoundedPrecedingFunction.java
new file mode 100644
index 0000000..c0ccd38
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRowsBoundedPrecedingFunction.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.over;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.dataview.PerKeyStateDataViewStore;
+import org.apache.flink.table.generated.AggsHandleFunction;
+import org.apache.flink.table.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Process Function for ROWS clause event-time bounded OVER window.
+ *
+ * <p>E.g.:
+ * SELECT rowtime, b, c,
+ * min(c) OVER
+ * (PARTITION BY b ORDER BY rowtime
+ * ROWS BETWEEN 2 PRECEDING AND CURRENT ROW),
+ * max(c) OVER
+ * (PARTITION BY b ORDER BY rowtime
+ * ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
+ * FROM T.
+ */
+public class RowTimeRowsBoundedPrecedingFunction<K> extends KeyedProcessFunctionWithCleanupState<K, BaseRow, BaseRow> {
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(RowTimeRowsBoundedPrecedingFunction.class);
+
+	private final GeneratedAggsHandleFunction genAggsHandler;
+	private final InternalType[] accTypes;
+	private final InternalType[] inputFieldTypes;
+	private final long precedingOffset;
+	private final int rowTimeIdx;
+
+	private transient JoinedRow output;
+
+	// the state which keeps the last triggering timestamp
+	private transient ValueState<Long> lastTriggeringTsState;
+
+	// the state which keeps the count of data
+	private transient ValueState<Long> counterState;
+
+	// the state which used to materialize the accumulator for incremental calculation
+	private transient ValueState<BaseRow> accState;
+
+	// the state which keeps all the data that are not expired.
+	// The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
+	// the second element of tuple is a list that contains the entire data of all the rows belonging
+	// to this time stamp.
+	private transient MapState<Long, List<BaseRow>> inputState;
+
+	private transient AggsHandleFunction function;
+
+	public RowTimeRowsBoundedPrecedingFunction(
+			long minRetentionTime,
+			long maxRetentionTime,
+			GeneratedAggsHandleFunction genAggsHandler,
+			InternalType[] accTypes,
+			InternalType[] inputFieldTypes,
+			long precedingOffset,
+			int rowTimeIdx) {
+		super(minRetentionTime, maxRetentionTime);
+		Preconditions.checkNotNull(precedingOffset);
+		this.genAggsHandler = genAggsHandler;
+		this.accTypes = accTypes;
+		this.inputFieldTypes = inputFieldTypes;
+		this.precedingOffset = precedingOffset;
+		this.rowTimeIdx = rowTimeIdx;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		function = genAggsHandler.newInstance(getRuntimeContext().getUserCodeClassLoader());
+		function.open(new PerKeyStateDataViewStore(getRuntimeContext()));
+
+		output = new JoinedRow();
+
+		ValueStateDescriptor<Long> lastTriggeringTsDescriptor = new ValueStateDescriptor<Long>(
+			"lastTriggeringTsState",
+			Types.LONG);
+		lastTriggeringTsState = getRuntimeContext().getState(lastTriggeringTsDescriptor);
+
+		ValueStateDescriptor<Long> dataCountStateDescriptor = new ValueStateDescriptor<Long>(
+			"processedCountState",
+			Types.LONG);
+		counterState = getRuntimeContext().getState(dataCountStateDescriptor);
+
+		BaseRowTypeInfo accTypeInfo = new BaseRowTypeInfo(accTypes);
+		ValueStateDescriptor<BaseRow> accStateDesc = new ValueStateDescriptor<BaseRow>("accState", accTypeInfo);
+		accState = getRuntimeContext().getState(accStateDesc);
+
+		// input element are all binary row as they are came from network
+		BaseRowTypeInfo inputType = new BaseRowTypeInfo(inputFieldTypes);
+		ListTypeInfo<BaseRow> rowListTypeInfo = new ListTypeInfo<BaseRow>(inputType);
+		MapStateDescriptor<Long, List<BaseRow>> inputStateDesc = new MapStateDescriptor<Long, List<BaseRow>>(
+			"inputState",
+			Types.LONG,
+			rowListTypeInfo);
+		inputState = getRuntimeContext().getMapState(inputStateDesc);
+
+		initCleanupTimeState("RowTimeBoundedRowsOverCleanupTime");
+	}
+
+	@Override
+	public void processElement(
+			BaseRow input,
+			KeyedProcessFunction<K, BaseRow, BaseRow>.Context ctx,
+			Collector<BaseRow> out) throws Exception {
+		// register state-cleanup timer
+		registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime());
+
+		// triggering timestamp for trigger calculation
+		long triggeringTs = input.getLong(rowTimeIdx);
+
+		Long lastTriggeringTs = lastTriggeringTsState.value();
+		if (lastTriggeringTs == null) {
+			lastTriggeringTs = 0L;
+		}
+
+		// check if the data is expired, if not, save the data and register event time timer
+		if (triggeringTs > lastTriggeringTs) {
+			List<BaseRow> data = inputState.get(triggeringTs);
+			if (null != data) {
+				data.add(input);
+				inputState.put(triggeringTs, data);
+			} else {
+				data = new ArrayList<BaseRow>();
+				data.add(input);
+				inputState.put(triggeringTs, data);
+				// register event time timer
+				ctx.timerService().registerEventTimeTimer(triggeringTs);
+			}
+		}
+	}
+
+	@Override
+	public void onTimer(
+			long timestamp,
+			KeyedProcessFunction<K, BaseRow, BaseRow>.OnTimerContext ctx,
+			Collector<BaseRow> out) throws Exception {
+		if (isProcessingTimeTimer(ctx)) {
+			if (needToCleanupState(timestamp)) {
+
+				Iterator<Long> keysIt = inputState.keys().iterator();
+				Long lastProcessedTime = lastTriggeringTsState.value();
+				if (lastProcessedTime == null) {
+					lastProcessedTime = 0L;
+				}
+
+				// is data left which has not been processed yet?
+				boolean noRecordsToProcess = true;
+				while (keysIt.hasNext() && noRecordsToProcess) {
+					if (keysIt.next() > lastProcessedTime) {
+						noRecordsToProcess = false;
+					}
+				}
+
+				if (noRecordsToProcess) {
+					// We clean the state
+					cleanupState(inputState, accState, counterState, lastTriggeringTsState);
+					function.cleanup();
+				} else {
+					// There are records left to process because a watermark has not been received yet.
+					// This would only happen if the input stream has stopped. So we don't need to clean up.
+					// We leave the state as it is and schedule a new cleanup timer
+					registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime());
+				}
+			}
+			return;
+		}
+
+		// gets all window data from state for the calculation
+		List<BaseRow> inputs = inputState.get(timestamp);
+
+		if (null != inputs) {
+
+			Long dataCount = counterState.value();
+			if (dataCount == null) {
+				dataCount = 0L;
+			}
+
+			BaseRow accumulators = accState.value();
+			if (accumulators == null) {
+				accumulators = function.createAccumulators();
+			}
+			// set accumulators in context first
+			function.setAccumulators(accumulators);
+
+			List<BaseRow> retractList = null;
+			long retractTs = Long.MAX_VALUE;
+			int retractCnt = 0;
+			int i = 0;
+
+			while (i < inputs.size()) {
+				BaseRow input = inputs.get(i);
+				BaseRow retractRow = null;
+				if (dataCount >= precedingOffset) {
+					if (null == retractList) {
+						// find the smallest timestamp
+						retractTs = Long.MAX_VALUE;
+						for (Long dataTs : inputState.keys()) {
+							if (dataTs < retractTs) {
+								retractTs = dataTs;
+								// get the oldest rows to retract them
+								retractList = inputState.get(dataTs);
+							}
+						}
+					}
+
+					if (retractList != null) {
+						retractRow = retractList.get(retractCnt);
+						retractCnt += 1;
+
+						// remove retracted values from state
+						if (retractList.size() == retractCnt) {
+							inputState.remove(retractTs);
+							retractList = null;
+							retractCnt = 0;
+						}
+					}
+				} else {
+					dataCount += 1;
+				}
+
+				// retract old row from accumulators
+				if (null != retractRow) {
+					function.retract(retractRow);
+				}
+
+				// accumulate current row
+				function.accumulate(input);
+
+				// prepare output row
+				output.replace(input, function.getValue());
+				out.collect(output);
+
+				i += 1;
+			}
+
+			// update all states
+			if (inputState.contains(retractTs)) {
+				if (retractCnt > 0) {
+					retractList.subList(0, retractCnt).clear();
+					inputState.put(retractTs, retractList);
+				}
+			}
+			counterState.update(dataCount);
+			// update the value of accumulators for future incremental computation
+			accumulators = function.getAccumulators();
+			accState.update(accumulators);
+		}
+
+		lastTriggeringTsState.update(timestamp);
+
+		// update cleanup timer
+		registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime());
+	}
+
+	@Override
+	public void close() throws Exception {
+		if (null != function) {
+			function.close();
+		}
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRowsUnboundedPrecedingFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRowsUnboundedPrecedingFunction.java
new file mode 100644
index 0000000..1d0a26d
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRowsUnboundedPrecedingFunction.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.over;
+
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+
+/**
+ * A ProcessFunction to support unbounded ROWS window.
+ * The ROWS clause defines on a physical level how many rows are included in a window frame.
+ *
+ * <p>E.g.:
+ * SELECT rowtime, b, c,
+ * min(c) OVER
+ * (PARTITION BY b ORDER BY rowtime
+ * ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW),
+ * max(c) OVER
+ * (PARTITION BY b ORDER BY rowtime
+ * ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)
+ * FROM T.
+ */
+public class RowTimeRowsUnboundedPrecedingFunction<K> extends AbstractRowTimeUnboundedPrecedingOver<K> {
+	private static final long serialVersionUID = 1L;
+
+	public RowTimeRowsUnboundedPrecedingFunction(
+			long minRetentionTime,
+			long maxRetentionTime,
+			GeneratedAggsHandleFunction genAggsHandler,
+			InternalType[] accTypes,
+			InternalType[] inputFieldTypes,
+			int rowTimeIdx) {
+		super(minRetentionTime, maxRetentionTime, genAggsHandler, accTypes, inputFieldTypes, rowTimeIdx);
+	}
+
+	@Override
+	public void processElementsWithSameTimestamp(
+			List<BaseRow> curRowList,
+			Collector<BaseRow> out) throws Exception {
+		int i = 0;
+		while (i < curRowList.size()) {
+			BaseRow curRow = curRowList.get(i);
+			// accumulate current row
+			function.accumulate(curRow);
+			// prepare output row
+			output.replace(curRow, function.getValue());
+			// emit output row
+			out.collect(output);
+			i += 1;
+		}
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java
index 3397817..273ab5f 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java
@@ -161,6 +161,10 @@ public class TypeConverters {
 			ObjectArrayTypeInfo arrayType = (ObjectArrayTypeInfo) typeInfo;
 			return InternalTypes.createArrayType(
 					createInternalTypeFromTypeInfo(arrayType.getComponentInfo()));
+		} else if (typeInfo instanceof MultisetTypeInfo) {
+			MultisetTypeInfo multisetType = (MultisetTypeInfo) typeInfo;
+			return InternalTypes.createMultisetType(
+				createInternalTypeFromTypeInfo(multisetType.getElementTypeInfo()));
 		} else if (typeInfo instanceof MapTypeInfo) {
 			MapTypeInfo mapType = (MapTypeInfo) typeInfo;
 			return InternalTypes.createMapType(
@@ -236,20 +240,19 @@ public class TypeConverters {
 		} else if (type instanceof ArrayType) {
 			return ObjectArrayTypeInfo.getInfoFor(
 					createExternalTypeInfoFromInternalType(((ArrayType) type).getElementType()));
+		} else if (type instanceof MultisetType) {
+			MultisetType multisetType = (MultisetType) type;
+			return MultisetTypeInfo.getInfoFor(
+				createExternalTypeInfoFromInternalType(multisetType.getElementType()));
 		} else if (type instanceof MapType) {
 			MapType mapType = (MapType) type;
 			return new MapTypeInfo(
 					createExternalTypeInfoFromInternalType(mapType.getKeyType()),
 					createExternalTypeInfoFromInternalType(mapType.getValueType()));
-		} else if (type instanceof MultisetType) {
-			MultisetType multisetType = (MultisetType) type;
-			return MultisetTypeInfo.getInfoFor(
-				createExternalTypeInfoFromInternalType(multisetType.getElementType()));
-		}
-		else if (type instanceof DecimalType) {
+		} else if (type instanceof DecimalType) {
 			DecimalType decimalType = (DecimalType) type;
 			return new BigDecimalTypeInfo(decimalType.precision(), decimalType.scale());
-		}  else if (type instanceof GenericType) {
+		} else if (type instanceof GenericType) {
 			GenericType genericType = (GenericType) type;
 			return genericType.getTypeInfo();
 		} else {
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/BaseRowHarnessAssertor.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/BaseRowHarnessAssertor.java
index 392e062..c95d547 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/BaseRowHarnessAssertor.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/BaseRowHarnessAssertor.java
@@ -47,15 +47,15 @@ public class BaseRowHarnessAssertor {
 	private final TypeInformation[] typeInfos;
 	private final Comparator<GenericRow> comparator;
 
-	public BaseRowHarnessAssertor(TypeInformation[] typeInfos) {
-		this(typeInfos, null);
-	}
-
 	public BaseRowHarnessAssertor(TypeInformation[] typeInfos, Comparator<GenericRow> comparator) {
 		this.typeInfos = typeInfos;
 		this.comparator = comparator;
 	}
 
+	public BaseRowHarnessAssertor(TypeInformation[] typeInfos) {
+		this(typeInfos, new StringComparator());
+	}
+
 
 	/**
 	 * Compare the two queues containing operator/task output by converting them to an array first.
@@ -73,9 +73,9 @@ public class BaseRowHarnessAssertor {
 	 * comparator. Assertes two sorted converted array should be same.
 	 */
 	public void assertOutputEqualsSorted(
-			String message,
-			Collection<Object> expected,
-			Collection<Object> actual) {
+		String message,
+		Collection<Object> expected,
+		Collection<Object> actual) {
 		assertOutputEquals(message, expected, actual, true);
 	}
 
@@ -133,4 +133,10 @@ public class BaseRowHarnessAssertor {
 		Assert.assertArrayEquals(message, sortedExpected, sortedActual);
 	}
 
+	private static class StringComparator implements Comparator<GenericRow> {
+		@Override
+		public int compare(GenericRow o1, GenericRow o2) {
+			return o1.toString().compareTo(o2.toString());
+		}
+	}
 }
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java
index 91eff9e..bd80dfc 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java
@@ -20,8 +20,16 @@ package org.apache.flink.table.runtime.util;
 
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.dataformat.BinaryRowWriter;
+import org.apache.flink.table.dataformat.BinaryString;
+import org.apache.flink.table.dataformat.BinaryWriter;
 import org.apache.flink.table.dataformat.GenericRow;
 import org.apache.flink.table.dataformat.util.BaseRowUtil;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.type.InternalTypes;
+
+import java.util.Date;
 
 import static org.apache.flink.table.dataformat.BinaryString.fromString;
 
@@ -83,6 +91,43 @@ public class StreamRecordUtils {
 		return GenericRow.of(objects);
 	}
 
+	/**
+	 * Receives a object array, generates a BinaryRow based on the array.
+	 *
+	 * @param fields input object array
+	 * @return generated BinaryRow.
+	 */
+	public static BinaryRow binaryrow(Object... fields) {
+		BinaryRow row = new BinaryRow(fields.length);
+		BinaryRowWriter writer = new BinaryRowWriter(row);
+		for (int j = 0; j < fields.length; j++) {
+			Object value = fields[j];
+			if (value == null) {
+				writer.setNullAt(j);
+			} else if (value instanceof Integer) {
+				writer.writeInt(j, (Integer) value);
+			} else if (value instanceof String) {
+				writer.writeString(j, BinaryString.fromString((String) value));
+			} else if (value instanceof Double) {
+				writer.writeDouble(j, (Double) value);
+			} else if (value instanceof Float) {
+				writer.writeFloat(j, (Float) value);
+			} else if (value instanceof Long) {
+				writer.writeLong(j, (Long) value);
+			} else if (value instanceof Boolean) {
+				writer.writeBoolean(j, (Boolean) value);
+			} else if (value instanceof Date) {
+				InternalType internalType = InternalTypes.DATE;
+				BinaryWriter.write(writer, j, value, internalType);
+			} else {
+				writer.setNullAt(j);
+			}
+		}
+
+		writer.complete();
+		return row;
+	}
+
 	private StreamRecordUtils() {
 		// deprecate default constructor
 	}