You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/07 17:56:51 UTC

[2/2] flink git commit: [FLINK-5803] [table] Add support for procTime partitioned OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL.

[FLINK-5803] [table] Add support for procTime partitioned OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL.

This closes #3397.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53fb8f3b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53fb8f3b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53fb8f3b

Branch: refs/heads/master
Commit: 53fb8f3b532a1be3d3cd655fbb5516a7e1ae8ada
Parents: cd801aa
Author: \u91d1\u7af9 <ji...@alibaba-inc.com>
Authored: Wed Feb 15 18:16:06 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Mar 7 18:56:15 2017 +0100

----------------------------------------------------------------------
 flink-libraries/flink-table/pom.xml             |   7 +
 .../table/codegen/calls/FunctionGenerator.scala |   2 +-
 .../flink/table/plan/nodes/OverAggregate.scala  |  95 ++++++++++
 .../datastream/DataStreamOverAggregate.scala    | 185 +++++++++++++++++++
 .../flink/table/plan/rules/FlinkRuleSets.scala  |   7 +-
 .../DataStreamOverAggregateRule.scala           |  63 +++++++
 .../table/runtime/aggregate/AggregateUtil.scala |  63 ++++---
 ...UnboundedProcessingOverProcessFunction.scala |  85 +++++++++
 .../table/api/scala/stream/sql/SqlITCase.scala  | 101 +++++++++-
 .../scala/stream/sql/WindowAggregateTest.scala  |  56 ++++++
 .../utils/StreamingWithStateTestBase.scala      |  40 ++++
 11 files changed, 673 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/53fb8f3b/flink-libraries/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml
index c6071b0..428b947 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -117,6 +117,13 @@ under the License.
 			<type>test-jar</type>
 			<scope>test</scope>
 		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-statebackend-rocksdb_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/53fb8f3b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index 7d55957..27e6dc6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -330,7 +330,7 @@ object FunctionGenerator {
       )
 
     // generate a constant for time indicator functions.
-    //   this is a temporary solution and will be removed when FLINK-5884 is implemented.
+    // this is a temporary solution and will be removed when FLINK-5884 is implemented.
     case ProcTimeExtractor | EventTimeExtractor =>
       Some(new CallGenerator {
         override def generate(codeGenerator: CodeGenerator, operands: Seq[GeneratedExpression]) = {

http://git-wip-us.apache.org/repos/asf/flink/blob/53fb8f3b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
new file mode 100644
index 0000000..793ab23
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rel.RelFieldCollation
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl}
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.core.Window.Group
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.functions.{ProcTimeType, RowTimeType}
+import scala.collection.JavaConverters._
+
+trait OverAggregate {
+
+  private[flink] def partitionToString(inputType: RelDataType, partition: Array[Int]): String = {
+    val inFields = inputType.getFieldNames.asScala
+    partition.map( inFields(_) ).mkString(", ")
+  }
+
+  private[flink] def orderingToString(
+    inputType: RelDataType,
+    orderFields: java.util.List[RelFieldCollation]): String = {
+
+    val inFields = inputType.getFieldList.asScala
+
+    val orderingString = orderFields.asScala.map {
+      x => inFields(x.getFieldIndex).getValue
+    }.mkString(", ")
+
+    orderingString
+  }
+
+  private[flink] def windowRange(overWindow: Group): String = {
+    s"BETWEEN ${overWindow.lowerBound} AND ${overWindow.upperBound}"
+  }
+
+  private[flink] def aggregationToString(
+    inputType: RelDataType,
+    rowType: RelDataType,
+    namedAggregates: Seq[CalcitePair[AggregateCall, String]]): String = {
+
+    val inFields = inputType.getFieldList.asScala.map {
+      x =>
+        x.asInstanceOf[RelDataTypeFieldImpl].getType
+        match {
+          case proceTime: ProcTimeType => "PROCTIME"
+          case rowTime: RowTimeType => "ROWTIME"
+          case _ => x.asInstanceOf[RelDataTypeFieldImpl].getName
+        }
+    }
+    val outFields = rowType.getFieldList.asScala.map {
+      x =>
+        x.asInstanceOf[RelDataTypeFieldImpl].getType
+        match {
+          case proceTime: ProcTimeType => "PROCTIME"
+          case rowTime: RowTimeType => "ROWTIME"
+          case _ => x.asInstanceOf[RelDataTypeFieldImpl].getName
+        }
+    }
+
+    val aggStrings = namedAggregates.map(_.getKey).map(
+      a => s"${a.getAggregation}(${
+        if (a.getArgList.size() > 0) {
+          inFields(a.getArgList.get(0))
+        } else {
+          "*"
+        }
+      })")
+
+    (inFields ++ aggStrings).zip(outFields).map {
+      case (f, o) => if (f == o) {
+        f
+      } else {
+        s"$f AS $o"
+      }
+    }.mkString(", ")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/53fb8f3b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
new file mode 100644
index 0000000..db115e0
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.plan.nodes.OverAggregate
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.core.Window
+import org.apache.calcite.rel.core.Window.Group
+import java.util.{List => JList}
+
+import org.apache.flink.table.functions.{ProcTimeType, RowTimeType}
+import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
+
+class DataStreamOverAggregate(
+    logicWindow: Window,
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputNode: RelNode,
+    rowRelDataType: RelDataType,
+    inputType: RelDataType)
+  extends SingleRel(cluster, traitSet, inputNode)
+  with OverAggregate
+  with DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
+    new DataStreamOverAggregate(
+      logicWindow,
+      cluster,
+      traitSet,
+      inputs.get(0),
+      getRowType,
+      inputType)
+  }
+
+  override def toString: String = {
+    s"OverAggregate($aggOpName)"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+
+    val overWindow: Group = logicWindow.groups.get(0)
+    val partitionKeys: Array[Int] = overWindow.keys.toArray
+    val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates
+
+    super.explainTerms(pw)
+      .itemIf("partitionBy", partitionToString(inputType, partitionKeys), partitionKeys.nonEmpty)
+        .item("orderBy",orderingToString(inputType, overWindow.orderKeys.getFieldCollations))
+      .itemIf("rows", windowRange(overWindow), overWindow.isRows)
+      .itemIf("range", windowRange(overWindow), !overWindow.isRows)
+      .item(
+        "select", aggregationToString(
+          inputType,
+          getRowType,
+          namedAggregates))
+  }
+
+  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
+    if (logicWindow.groups.size > 1) {
+      throw new TableException(
+        "Unsupported use of OVER windows. All aggregates must be computed on the same window.")
+    }
+
+    val overWindow: org.apache.calcite.rel.core.Window.Group = logicWindow.groups.get(0)
+
+    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+
+    if (overWindow.orderKeys.getFieldCollations.size() != 1) {
+      throw new TableException(
+        "Unsupported use of OVER windows. The window may only be ordered by a single time column.")
+    }
+
+    val timeType = inputType
+      .getFieldList
+      .get(overWindow.orderKeys.getFieldCollations.get(0).getFieldIndex)
+      .getValue
+
+    timeType match {
+      case _: ProcTimeType =>
+        // both ROWS and RANGE clause with UNBOUNDED PRECEDING and CURRENT ROW condition.
+        if (overWindow.lowerBound.isUnbounded &&
+          overWindow.upperBound.isCurrentRow) {
+          createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS)
+        } else {
+          throw new TableException(
+              "OVER window only support ProcessingTime UNBOUNDED PRECEDING and CURRENT ROW " +
+              "condition.")
+        }
+      case _: RowTimeType =>
+        throw new TableException("OVER Window of the EventTime type is not currently supported.")
+      case _ =>
+        throw new TableException(s"Unsupported time type {$timeType}")
+    }
+
+  }
+
+  def createUnboundedAndCurrentRowProcessingTimeOverWindow(
+    inputDS: DataStream[Row]): DataStream[Row]  = {
+
+    val overWindow: Group = logicWindow.groups.get(0)
+    val partitionKeys: Array[Int] = overWindow.keys.toArray
+    val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates
+
+    // get the output types
+    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+
+    val result: DataStream[Row] =
+        // partitioned aggregation
+        if (partitionKeys.nonEmpty) {
+          val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction(
+            namedAggregates,
+            inputType)
+
+          inputDS
+          .keyBy(partitionKeys: _*)
+          .process(processFunction)
+          .returns(rowTypeInfo)
+          .name(aggOpName)
+          .asInstanceOf[DataStream[Row]]
+        }
+        // global non-partitioned aggregation
+        else {
+          throw TableException(
+            "Non-partitioned processing time OVER aggregation is not supported yet.")
+        }
+    result
+  }
+
+  private def generateNamedAggregates: Seq[CalcitePair[AggregateCall, String]] = {
+    val overWindow: Group = logicWindow.groups.get(0)
+
+    val aggregateCalls = overWindow.getAggregateCalls(logicWindow)
+    for (i <- 0 until aggregateCalls.size())
+      yield new CalcitePair[AggregateCall, String](aggregateCalls.get(i), "w0$o" + i)
+  }
+
+  private def aggOpName = {
+    val overWindow: Group = logicWindow.groups.get(0)
+    val partitionKeys: Array[Int] = overWindow.keys.toArray
+    val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates
+
+    s"over: (${
+      if (!partitionKeys.isEmpty) {
+        s"PARTITION BY: ${partitionToString(inputType, partitionKeys)}, "
+      } else {
+        ""
+      }
+    }ORDER BY: ${orderingToString(inputType, overWindow.orderKeys.getFieldCollations)}, " +
+      s"${if (overWindow.isRows) "ROWS" else "RANGE"}" +
+      s"${windowRange(overWindow)}, " +
+      s"select: (${
+        aggregationToString(
+          inputType,
+          getRowType,
+          namedAggregates)
+      }))"
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/53fb8f3b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 8f16d32..39e4353 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -35,7 +35,8 @@ object FlinkRuleSets {
     ReduceExpressionsRule.FILTER_INSTANCE,
     ReduceExpressionsRule.PROJECT_INSTANCE,
     ReduceExpressionsRule.CALC_INSTANCE,
-    ReduceExpressionsRule.JOIN_INSTANCE
+    ReduceExpressionsRule.JOIN_INSTANCE,
+    ProjectToWindowRule.PROJECT
   )
 
   /**
@@ -134,7 +135,8 @@ object FlinkRuleSets {
     // simplify expressions rules
     ReduceExpressionsRule.FILTER_INSTANCE,
     ReduceExpressionsRule.PROJECT_INSTANCE,
-    ReduceExpressionsRule.CALC_INSTANCE
+    ReduceExpressionsRule.CALC_INSTANCE,
+    ProjectToWindowRule.PROJECT
   )
 
   /**
@@ -167,6 +169,7 @@ object FlinkRuleSets {
       UnionEliminatorRule.INSTANCE,
 
       // translate to DataStream nodes
+      DataStreamOverAggregateRule.INSTANCE,
       DataStreamAggregateRule.INSTANCE,
       DataStreamCalcRule.INSTANCE,
       DataStreamScanRule.INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/53fb8f3b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala
new file mode 100644
index 0000000..214d68b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalWindow
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
+import org.apache.flink.table.plan.nodes.datastream.DataStreamOverAggregate
+
+import scala.collection.JavaConversions._
+
+/**
+  * Rule to convert a LogicalWindow into a DataStreamOverAggregate.
+  */
+class DataStreamOverAggregateRule
+  extends ConverterRule(
+    classOf[LogicalWindow],
+    Convention.NONE,
+    DataStreamConvention.INSTANCE,
+    "DataStreamOverAggregateRule") {
+
+  override def convert(rel: RelNode): RelNode = {
+    val logicWindow: LogicalWindow = rel.asInstanceOf[LogicalWindow]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+    val convertInput: RelNode =
+      RelOptRule.convert(logicWindow.getInput, DataStreamConvention.INSTANCE)
+
+    val inputRowType = convertInput.asInstanceOf[RelSubset].getOriginal.getRowType
+
+    new DataStreamOverAggregate(
+      logicWindow,
+      rel.getCluster,
+      traitSet,
+      convertInput,
+      rel.getRowType,
+      inputRowType)
+  }
+}
+
+object DataStreamOverAggregateRule {
+  val INSTANCE: RelOptRule = new DataStreamOverAggregateRule
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/53fb8f3b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 745660d..acb6cd0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -26,7 +26,7 @@ import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.sql.fun._
 import org.apache.flink.api.common.functions.{GroupCombineFunction, InvalidTypesException, MapFunction, MapPartitionFunction, RichGroupReduceFunction, AggregateFunction => ApiAggregateFunction}
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeHint, TypeInformation}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
@@ -36,6 +36,7 @@ import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.typeutils.TypeCheckUtils._
 import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction}
 import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
+import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.table.api.{TableException, Types}
 import org.apache.flink.table.functions.aggfunctions._
 import org.apache.flink.table.functions.{AggregateFunction => TableAggregateFunction}
@@ -78,8 +79,7 @@ object AggregateUtil {
     val (aggFieldIndexes, aggregates) = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),
       inputType,
-      groupings.length,
-      false)
+      needRetraction = false)
 
     val mapReturnType: RowTypeInfo =
       createDataSetAggregateBufferDataType(groupings, aggregates, inputType)
@@ -94,6 +94,33 @@ object AggregateUtil {
   }
 
   /**
+    * Create an [[ProcessFunction]] to evaluate final aggregate value.
+    *
+    * @param namedAggregates List of calls to aggregate functions and their output field names
+    * @param inputType Input row type
+    * @return [[UnboundedProcessingOverProcessFunction]]
+    */
+  private[flink] def CreateUnboundedProcessingOverProcessFunction(
+    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+    inputType: RelDataType): UnboundedProcessingOverProcessFunction = {
+
+    val (aggFields, aggregates) =
+      transformToAggregateFunctions(
+        namedAggregates.map(_.getKey),
+        inputType,
+        needRetraction = false)
+
+    val aggregationStateType: RowTypeInfo =
+      createDataSetAggregateBufferDataType(Array(), aggregates, inputType)
+
+    new UnboundedProcessingOverProcessFunction(
+      aggregates,
+      aggFields,
+      inputType.getFieldCount,
+      aggregationStateType)
+  }
+
+  /**
     * Create a [[org.apache.flink.api.common.functions.MapFunction]] that prepares for aggregates.
     * The output of the function contains the grouping keys and the timestamp and the intermediate
     * aggregate values of all aggregate function. The timestamp field is aligned to time window
@@ -126,8 +153,7 @@ object AggregateUtil {
     val (aggFieldIndexes, aggregates) = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),
       inputType,
-      groupings.length,
-      false)
+      needRetraction = false)
 
     val mapReturnType: RowTypeInfo =
       createDataSetAggregateBufferDataType(
@@ -180,8 +206,7 @@ object AggregateUtil {
     val aggregates = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),
       inputType,
-      groupings.length,
-      false)._2
+      needRetraction = false)._2
 
     // the mapping relation between field index of intermediate aggregate Row and output Row.
     val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings)
@@ -278,7 +303,7 @@ object AggregateUtil {
     val aggregates = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),
       inputType,
-      0)._2
+      needRetraction = false)._2
 
     window match {
       case EventTimeSessionGroupWindow(_, _, gap) =>
@@ -327,8 +352,7 @@ object AggregateUtil {
     val aggregates = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),
       inputType,
-      groupings.length,
-      false)._2
+      needRetraction = false)._2
 
     window match {
       case EventTimeSessionGroupWindow(_, _, gap) =>
@@ -366,11 +390,10 @@ object AggregateUtil {
       inGroupingSet: Boolean)
     : RichGroupReduceFunction[Row, Row] = {
 
-    val (aggFieldIndex, aggregates) = transformToAggregateFunctions(
+    val aggregates = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),
       inputType,
-      groupings.length,
-      false)
+      needRetraction = false)._2
 
     val (groupingOffsetMapping, aggOffsetMapping) =
       getGroupingOffsetAndAggOffsetMapping(
@@ -466,8 +489,7 @@ object AggregateUtil {
       transformToAggregateFunctions(
         namedAggregates.map(_.getKey),
         inputType,
-        groupKeysIndex.length,
-        false)
+        needRetraction = false)
 
     val aggregateMapping = getAggregateMapping(namedAggregates, outputType)
 
@@ -496,8 +518,7 @@ object AggregateUtil {
     val aggregateList = transformToAggregateFunctions(
       aggregateCalls,
       inputType,
-      groupKeysCount,
-      false)._2
+      needRetraction = false)._2
 
     doAllSupportPartialMerge(aggregateList)
   }
@@ -598,7 +619,7 @@ object AggregateUtil {
 
     val propPos = properties.foldRight((None: Option[Int], None: Option[Int], 0)) {
       (p, x) => p match {
-        case NamedWindowProperty(name, prop) =>
+        case NamedWindowProperty(_, prop) =>
           prop match {
             case WindowStart(_) if x._1.isDefined =>
               throw new TableException("Duplicate WindowStart property encountered. This is a bug.")
@@ -617,7 +638,6 @@ object AggregateUtil {
   private def transformToAggregateFunctions(
       aggregateCalls: Seq[AggregateCall],
       inputType: RelDataType,
-      groupKeysCount: Int,
       needRetraction: Boolean)
   : (Array[Int], Array[TableAggregateFunction[_ <: Any]]) = {
 
@@ -625,11 +645,6 @@ object AggregateUtil {
     val aggFieldIndexes = new Array[Int](aggregateCalls.size)
     val aggregates = new Array[TableAggregateFunction[_ <: Any]](aggregateCalls.size)
 
-    // set the start offset of aggregate buffer value to group keys' length,
-    // as all the group keys would be moved to the start fields of intermediate
-    // aggregate data.
-    var aggOffset = groupKeysCount
-
     // create aggregate function instances by function type and aggregate field data type.
     aggregateCalls.zipWithIndex.foreach { case (aggregateCall, index) =>
       val argList: util.List[Integer] = aggregateCall.getArgList

http://git-wip-us.apache.org/repos/asf/flink/blob/53fb8f3b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala
new file mode 100644
index 0000000..058b4a7
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+class UnboundedProcessingOverProcessFunction(
+    private val aggregates: Array[AggregateFunction[_]],
+    private val aggFields: Array[Int],
+    private val forwardedFieldCount: Int,
+    private val aggregationStateType: RowTypeInfo)
+  extends ProcessFunction[Row, Row]{
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var state: ValueState[Row] = _
+
+  override def open(config: Configuration) {
+    output = new Row(forwardedFieldCount + aggregates.length)
+    val stateDescriptor: ValueStateDescriptor[Row] =
+      new ValueStateDescriptor[Row]("overState", aggregationStateType)
+    state = getRuntimeContext.getState(stateDescriptor)
+  }
+
+  override def processElement(
+    input: Row,
+    ctx: ProcessFunction[Row, Row]#Context,
+    out: Collector[Row]): Unit = {
+
+    var accumulators = state.value()
+
+    if (null == accumulators) {
+      accumulators = new Row(aggregates.length)
+      var i = 0
+      while (i < aggregates.length) {
+        accumulators.setField(i, aggregates(i).createAccumulator())
+        i += 1
+      }
+    }
+
+    var i = 0
+    while (i < forwardedFieldCount) {
+      output.setField(i, input.getField(i))
+      i += 1
+    }
+
+    i = 0
+    while (i < aggregates.length) {
+      val index = forwardedFieldCount + i
+      val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
+      aggregates(i).accumulate(accumulator, input.getField(aggFields(i)))
+      output.setField(index, aggregates(i).getValue(accumulator))
+      i += 1
+    }
+    state.update(accumulators)
+
+    out.collect(output)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/53fb8f3b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index 70bec72..cf8e442 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -20,17 +20,28 @@ package org.apache.flink.table.api.scala.stream.sql
 
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{TableEnvironment, TableException}
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData}
+import org.apache.flink.table.api.scala.stream.utils.{StreamingWithStateTestBase, StreamITCase,
+StreamTestData}
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit._
 
 import scala.collection.mutable
 
-class SqlITCase extends StreamingMultipleProgramsTestBase {
+class SqlITCase extends StreamingWithStateTestBase {
+
+  val data = List(
+    (1L, 1, "Hello"),
+    (2L, 2, "Hello"),
+    (3L, 3, "Hello"),
+    (4L, 4, "Hello"),
+    (5L, 5, "Hello"),
+    (6L, 6, "Hello"),
+    (7L, 7, "Hello World"),
+    (8L, 8, "Hello World"),
+    (20L, 20, "Hello World"))
 
   /** test selection **/
   @Test
@@ -171,4 +182,86 @@ class SqlITCase extends StreamingMultipleProgramsTestBase {
     val expected = mutable.MutableList("Hello", "Hello world")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testUnboundPartitionedProcessingWindowWithRange(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    // for sum aggregation ensure that every time the order of each element is consistent
+    env.setParallelism(1)
+
+    val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY ProcTime()  RANGE UNBOUNDED preceding) as cnt1, " +
+      "sum(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt2 " +
+      "from T1"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "Hello World,1,7", "Hello World,2,15", "Hello World,3,35",
+      "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testUnboundPartitionedProcessingWindowWithRow(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY ProcTime() ROWS BETWEEN UNBOUNDED preceding AND " +
+      "CURRENT ROW)" +
+      "from T1"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "Hello World,1", "Hello World,2", "Hello World,3",
+      "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  /**
+    *  All aggregates must be computed on the same window.
+    */
+  @Test(expected = classOf[TableException])
+  def testMultiWindow(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
+
+    tEnv.registerTable("T1", t1)
+
+    val sqlQuery = "SELECT " +
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt1, " +
+      "sum(a) OVER (PARTITION BY b ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt2 " +
+      "from T1"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53fb8f3b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index c1d39aa..e12572f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -129,4 +129,60 @@ class WindowAggregateTest extends TableTestBase {
     val expected = ""
     streamUtil.verifySql(sql, expected)
   }
+
+  @Test
+  def testUnboundPartitionedProcessingWindowWithRange() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt1, " +
+      "sum(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt2 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "PROCTIME() AS $2")
+          ),
+          term("partitionBy", "c"),
+          term("orderBy", "PROCTIME"),
+          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
+        ),
+        term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testUnboundPartitionedProcessingWindowWithRow() = {
+    val sql = "SELECT " +
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY ProcTime() ROWS BETWEEN UNBOUNDED preceding AND " +
+      "CURRENT ROW) as cnt1 " +
+      "from MyTable"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "PROCTIME() AS $2")
+          ),
+          term("partitionBy", "c"),
+          term("orderBy", "PROCTIME"),
+          term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS $1")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53fb8f3b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamingWithStateTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamingWithStateTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamingWithStateTestBase.scala
new file mode 100644
index 0000000..5f3f6ca
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamingWithStateTestBase.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.utils
+
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
+
+import org.junit.rules.TemporaryFolder
+import org.junit.Rule
+
+class StreamingWithStateTestBase extends StreamingMultipleProgramsTestBase {
+
+  val _tempFolder = new TemporaryFolder
+
+  @Rule
+  def tempFolder: TemporaryFolder = _tempFolder
+
+  def getStateBackend: RocksDBStateBackend = {
+    val dbPath = tempFolder.newFolder().getAbsolutePath
+    val checkpointPath = tempFolder.newFolder().toURI.toString
+    val backend = new RocksDBStateBackend(checkpointPath)
+    backend.setDbStoragePath(dbPath)
+    backend
+  }
+}