You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/03 15:53:19 UTC

flink git commit: [FLINK-5219] [table] Add non-grouped session windows for batch tables.

Repository: flink
Updated Branches:
  refs/heads/master c31f95cab -> 728c936dd


[FLINK-5219] [table] Add non-grouped session windows for batch tables.

This closes #3266.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/728c936d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/728c936d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/728c936d

Branch: refs/heads/master
Commit: 728c936dd2ac18701e1d8696da251aec351b2ae6
Parents: c31f95c
Author: \u91d1\u7af9 <ji...@alibaba-inc.com>
Authored: Fri Mar 3 10:39:37 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Mar 3 16:52:13 2017 +0100

----------------------------------------------------------------------
 .../nodes/dataset/DataSetWindowAggregate.scala  |  96 ++++++---
 .../table/runtime/aggregate/AggregateUtil.scala |  57 +++++-
 ...ionWindowAggregateCombineGroupFunction.scala | 168 ----------------
 ...aSetSessionWindowAggregatePreProcessor.scala | 197 +++++++++++++++++++
 .../dataset/DataSetWindowAggregateITCase.scala  |  21 +-
 5 files changed, 331 insertions(+), 208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/728c936d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
index 597be8c..fb5ff3b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
@@ -194,33 +194,32 @@ class DataSetWindowAggregate(
     val groupingKeys = grouping.indices.toArray
     val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
 
-    // grouping window
-    if (groupingKeys.length > 0) {
-      // create mapFunction for initializing the aggregations
-      val mapFunction = createDataSetWindowPrepareMapFunction(
-        window,
-        namedAggregates,
-        grouping,
-        inputType,
-        isParserCaseSensitive)
-
-      val mappedInput = inputDS.map(mapFunction).name(prepareOperatorName)
+    // create mapFunction for initializing the aggregations
+    val mapFunction = createDataSetWindowPrepareMapFunction(
+      window,
+      namedAggregates,
+      grouping,
+      inputType,
+      isParserCaseSensitive)
 
-      val mapReturnType = mapFunction.asInstanceOf[ResultTypeQueryable[Row]].getProducedType
+    val mappedInput = inputDS.map(mapFunction).name(prepareOperatorName)
 
-      // the position of the rowtime field in the intermediate result for map output
-      val rowTimeFieldPos = mapReturnType.getArity - 1
+    val mapReturnType = mapFunction.asInstanceOf[ResultTypeQueryable[Row]].getProducedType
 
-      // do incremental aggregation
-      if (doAllSupportPartialMerge(
-        namedAggregates.map(_.getKey),
-        inputType,
-        grouping.length)) {
+    // the position of the rowtime field in the intermediate result for map output
+    val rowTimeFieldPos = mapReturnType.getArity - 1
 
-        // gets the window-start and window-end position  in the intermediate result.
-        val windowStartPos = rowTimeFieldPos
-        val windowEndPos = windowStartPos + 1
+    // do incremental aggregation
+    if (doAllSupportPartialMerge(
+      namedAggregates.map(_.getKey),
+      inputType,
+      grouping.length)) {
 
+      // gets the window-start and window-end position  in the intermediate result.
+      val windowStartPos = rowTimeFieldPos
+      val windowEndPos = windowStartPos + 1
+      // grouping window
+      if (groupingKeys.length > 0) {
         // create groupCombineFunction for combine the aggregations
         val combineGroupFunction = createDataSetWindowAggregationCombineFunction(
           window,
@@ -248,9 +247,38 @@ class DataSetWindowAggregate(
           .reduceGroup(groupReduceFunction)
           .returns(rowTypeInfo)
           .name(aggregateOperatorName)
+      } else {
+        // non-grouping window
+        val mapPartitionFunction = createDataSetWindowAggregationMapPartitionFunction(
+          window,
+          namedAggregates,
+          inputType,
+          grouping)
+
+        // create groupReduceFunction for calculating the aggregations
+        val groupReduceFunction = createDataSetWindowAggregationGroupReduceFunction(
+          window,
+          namedAggregates,
+          inputType,
+          rowRelDataType,
+          grouping,
+          namedProperties,
+          isInputCombined = true)
+
+        mappedInput.sortPartition(rowTimeFieldPos, Order.ASCENDING)
+          .mapPartition(mapPartitionFunction)
+          .sortPartition(windowStartPos, Order.ASCENDING).setParallelism(1)
+          .sortPartition(windowEndPos, Order.ASCENDING).setParallelism(1)
+          .reduceGroup(groupReduceFunction)
+          .returns(rowTypeInfo)
+          .name(aggregateOperatorName)
+          .asInstanceOf[DataSet[Row]]
       }
-      // do non-incremental aggregation
-      else {
+    // do non-incremental aggregation
+    } else {
+      // grouping window
+      if (groupingKeys.length > 0) {
+
         // create groupReduceFunction for calculating the aggregations
         val groupReduceFunction = createDataSetWindowAggregationGroupReduceFunction(
           window,
@@ -265,13 +293,23 @@ class DataSetWindowAggregate(
           .reduceGroup(groupReduceFunction)
           .returns(rowTypeInfo)
           .name(aggregateOperatorName)
+      } else {
+        // non-grouping window
+        val groupReduceFunction = createDataSetWindowAggregationGroupReduceFunction(
+          window,
+          namedAggregates,
+          inputType,
+          rowRelDataType,
+          grouping,
+          namedProperties)
+
+        mappedInput.sortPartition(rowTimeFieldPos, Order.ASCENDING).setParallelism(1)
+          .reduceGroup(groupReduceFunction)
+          .returns(rowTypeInfo)
+          .name(aggregateOperatorName)
+          .asInstanceOf[DataSet[Row]]
       }
     }
-    // non-grouping window
-    else {
-      throw new UnsupportedOperationException(
-        "Session non-grouping windows on event-time are currently not supported.")
-    }
   }
 
   private def prepareOperatorName: String = {

http://git-wip-us.apache.org/repos/asf/flink/blob/728c936d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 40468ad..d549c37 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -25,7 +25,7 @@ import org.apache.calcite.sql.{SqlAggFunction, SqlKind}
 import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.sql.fun._
-import org.apache.flink.api.common.functions.{InvalidTypesException, MapFunction, RichGroupCombineFunction, RichGroupReduceFunction, AggregateFunction => ApiAggregateFunction}
+import org.apache.flink.api.common.functions.{GroupCombineFunction, InvalidTypesException, MapFunction, MapPartitionFunction, RichGroupReduceFunction, AggregateFunction => ApiAggregateFunction}
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeHint, TypeInformation}
 import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.api.java.typeutils.RowTypeInfo
@@ -246,6 +246,57 @@ object AggregateUtil {
   }
 
   /**
+    * Create a [[org.apache.flink.api.common.functions.MapPartitionFunction]] that aggregation
+    * for aggregates.
+    * The function returns aggregate values of all aggregate function which are
+    * organized by the following format:
+    *
+    * {{{
+    *       avg(x) aggOffsetInRow = 2  count(z) aggOffsetInRow = 5
+    *           |                          |          windowEnd(max(rowtime)
+    *           |                          |                   |
+    *           v                          v                   v
+    *        +--------+--------+--------+--------+-----------+---------+
+    *        |  sum1  | count1 |  sum2  | count2 |windowStart|windowEnd|
+    *        +--------+--------+--------+--------+-----------+---------+
+    *                               ^                 ^
+    *                               |                 |
+    *             sum(y) aggOffsetInRow = 4    windowStart(min(rowtime))
+    *
+    * }}}
+    *
+    */
+  def createDataSetWindowAggregationMapPartitionFunction(
+    window: LogicalWindow,
+    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+    inputType: RelDataType,
+    groupings: Array[Int]): MapPartitionFunction[Row, Row] = {
+
+    val aggregates = transformToAggregateFunctions(
+      namedAggregates.map(_.getKey),
+      inputType,
+      0)._2
+
+    window match {
+      case EventTimeSessionGroupWindow(_, _, gap) =>
+        val combineReturnType: RowTypeInfo =
+          createDataSetAggregateBufferDataType(
+            groupings,
+            aggregates,
+            inputType,
+            Option(Array(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO)))
+
+        new DataSetSessionWindowAggregatePreProcessor(
+          aggregates,
+          groupings,
+          asLong(gap),
+          combineReturnType)
+      case _ =>
+        throw new UnsupportedOperationException(s"$window is currently not supported on batch")
+    }
+  }
+
+  /**
     * Create a [[org.apache.flink.api.common.functions.GroupCombineFunction]] that pre-aggregation
     * for aggregates.
     * The function returns intermediate aggregate values of all aggregate function which are
@@ -268,7 +319,7 @@ object AggregateUtil {
       namedAggregates: Seq[CalcitePair[AggregateCall, String]],
       inputType: RelDataType,
       groupings: Array[Int])
-    : RichGroupCombineFunction[Row, Row] = {
+    : GroupCombineFunction[Row, Row] = {
 
     val aggregates = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),
@@ -284,7 +335,7 @@ object AggregateUtil {
             inputType,
             Option(Array(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO)))
 
-        new DataSetSessionWindowAggregateCombineGroupFunction(
+        new DataSetSessionWindowAggregatePreProcessor(
           aggregates,
           groupings,
           asLong(gap),

http://git-wip-us.apache.org/repos/asf/flink/blob/728c936d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala
deleted file mode 100644
index 88cd19f..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.aggregate
-
-import java.lang.Iterable
-import java.util.{ArrayList => JArrayList}
-
-import org.apache.flink.api.common.functions.RichGroupCombineFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.types.Row
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
-import org.apache.flink.util.{Collector, Preconditions}
-
-/**
-  * This wraps the aggregate logic inside of
-  * [[org.apache.flink.api.java.operators.GroupCombineOperator]].
-  *
-  * @param aggregates          The aggregate functions.
-  * @param groupingKeys        The indexes of the grouping fields.
-  * @param gap                 Session time window gap.
-  * @param intermediateRowType Intermediate row data type.
-  */
-class DataSetSessionWindowAggregateCombineGroupFunction(
-    aggregates: Array[AggregateFunction[_ <: Any]],
-    groupingKeys: Array[Int],
-    gap: Long,
-    @transient intermediateRowType: TypeInformation[Row])
-  extends RichGroupCombineFunction[Row, Row] with ResultTypeQueryable[Row] {
-
-  private var aggregateBuffer: Row = _
-  private val accumStartPos: Int = groupingKeys.length
-  private val rowTimeFieldPos = accumStartPos + aggregates.length
-
-  val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) {
-    new JArrayList[Accumulator](2)
-  }
-
-  override def open(config: Configuration) {
-    Preconditions.checkNotNull(aggregates)
-    Preconditions.checkNotNull(groupingKeys)
-    aggregateBuffer = new Row(rowTimeFieldPos + 2)
-
-    // init lists with two empty accumulators
-    for (i <- aggregates.indices) {
-      val accumulator = aggregates(i).createAccumulator()
-      accumulatorList(i).add(accumulator)
-      accumulatorList(i).add(accumulator)
-    }
-  }
-
-  /**
-    * For sub-grouped intermediate aggregate Rows, divide window based on the rowtime
-    * (current'rowtime - previous\u2019rowtime > gap), and then merge data (within a unified window)
-    * into an aggregate buffer.
-    *
-    * @param records Sub-grouped intermediate aggregate Rows.
-    * @return Combined intermediate aggregate Row.
-    *
-    */
-  override def combine(records: Iterable[Row], out: Collector[Row]): Unit = {
-
-    var windowStart: java.lang.Long = null
-    var windowEnd: java.lang.Long = null
-    var currentRowTime: java.lang.Long = null
-
-    // reset first accumulator in merge list
-    for (i <- aggregates.indices) {
-      val accumulator = aggregates(i).createAccumulator()
-      accumulatorList(i).set(0, accumulator)
-    }
-
-    val iterator = records.iterator()
-
-    while (iterator.hasNext) {
-      val record = iterator.next()
-      currentRowTime = record.getField(rowTimeFieldPos).asInstanceOf[Long]
-      // initial traversal or opening a new window
-      if (windowEnd == null || (windowEnd != null && (currentRowTime > windowEnd))) {
-
-        // calculate the current window and open a new window.
-        if (windowEnd != null) {
-          // emit the current window's merged data
-          doCollect(out, accumulatorList, windowStart, windowEnd)
-
-          // reset first value of accumulator list
-          for (i <- aggregates.indices) {
-            val accumulator = aggregates(i).createAccumulator()
-            accumulatorList(i).set(0, accumulator)
-          }
-        } else {
-          // set group keys to aggregateBuffer.
-          for (i <- groupingKeys.indices) {
-            aggregateBuffer.setField(i, record.getField(i))
-          }
-        }
-
-        windowStart = record.getField(rowTimeFieldPos).asInstanceOf[Long]
-      }
-
-      for (i <- aggregates.indices) {
-        // insert received accumulator into acc list
-        val newAcc = record.getField(accumStartPos + i).asInstanceOf[Accumulator]
-        accumulatorList(i).set(1, newAcc)
-        // merge acc list
-        val retAcc = aggregates(i).merge(accumulatorList(i))
-        // insert result into acc list
-        accumulatorList(i).set(0, retAcc)
-      }
-
-      // the current rowtime is the last rowtime of the next calculation.
-      windowEnd = currentRowTime + gap
-    }
-    // emit the merged data of the current window.
-    doCollect(out, accumulatorList, windowStart, windowEnd)
-  }
-
-  /**
-    * Emit the merged data of the current window.
-    *
-    * @param out             the collection of the aggregate results
-    * @param accumulatorList an array (indexed by aggregate index) of the accumulator lists for
-    *                        each aggregate
-    * @param windowStart     the window's start attribute value is the min (rowtime)
-    *                        of all rows in the window.
-    * @param windowEnd       the window's end property value is max (rowtime) + gap
-    *                        for all rows in the window.
-    */
-  def doCollect(
-      out: Collector[Row],
-      accumulatorList: Array[JArrayList[Accumulator]],
-      windowStart: Long,
-      windowEnd: Long): Unit = {
-
-    // merge the accumulators into one accumulator
-    for (i <- aggregates.indices) {
-      aggregateBuffer.setField(accumStartPos + i, accumulatorList(i).get(0))
-    }
-
-    // intermediate Row WindowStartPos is rowtime pos.
-    aggregateBuffer.setField(rowTimeFieldPos, windowStart)
-
-    // intermediate Row WindowEndPos is rowtime pos + 1.
-    aggregateBuffer.setField(rowTimeFieldPos + 1, windowEnd)
-
-    out.collect(aggregateBuffer)
-  }
-
-  override def getProducedType: TypeInformation[Row] = {
-    intermediateRowType
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/728c936d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
new file mode 100644
index 0000000..a299c40
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+import java.util.{ArrayList => JArrayList}
+
+import org.apache.flink.api.common.functions.{AbstractRichFunction, GroupCombineFunction, MapPartitionFunction, RichGroupCombineFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+  * This wraps the aggregate logic inside of
+  * [[org.apache.flink.api.java.operators.GroupCombineOperator]].
+  *
+  * @param aggregates          The aggregate functions.
+  * @param groupingKeys        The indexes of the grouping fields.
+  * @param gap                 Session time window gap.
+  * @param intermediateRowType Intermediate row data type.
+  */
+class DataSetSessionWindowAggregatePreProcessor(
+    aggregates: Array[AggregateFunction[_ <: Any]],
+    groupingKeys: Array[Int],
+    gap: Long,
+    @transient intermediateRowType: TypeInformation[Row])
+  extends AbstractRichFunction
+  with MapPartitionFunction[Row,Row]
+  with GroupCombineFunction[Row,Row]
+  with ResultTypeQueryable[Row] {
+
+  private var aggregateBuffer: Row = _
+  private val accumStartPos: Int = groupingKeys.length
+  private val rowTimeFieldPos = accumStartPos + aggregates.length
+
+  val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) {
+    new JArrayList[Accumulator](2)
+  }
+
+  override def open(config: Configuration) {
+    Preconditions.checkNotNull(aggregates)
+    Preconditions.checkNotNull(groupingKeys)
+    aggregateBuffer = new Row(rowTimeFieldPos + 2)
+
+    // init lists with two empty accumulators
+    for (i <- aggregates.indices) {
+      val accumulator = aggregates(i).createAccumulator()
+      accumulatorList(i).add(accumulator)
+      accumulatorList(i).add(accumulator)
+    }
+  }
+
+  /**
+    * For sub-grouped intermediate aggregate Rows, divide window based on the rowtime
+    * (current'rowtime - previous\u2019rowtime > gap), and then merge data (within a unified window)
+    * into an aggregate buffer.
+    *
+    * @param records  Sub-grouped intermediate aggregate Rows.
+    * @return Combined intermediate aggregate Row.
+    *
+    */
+  override def combine(records: Iterable[Row], out: Collector[Row]): Unit = {
+    preProcessing(records, out)
+  }
+
+  /**
+    * Divide window based on the rowtime
+    * (current'rowtime - previous\u2019rowtime > gap), and then merge data (within a unified window)
+    * into an aggregate buffer.
+    *
+    * @param records  Intermediate aggregate Rows.
+    * @return Pre partition intermediate aggregate Row.
+    *
+    */
+  override def mapPartition(records: Iterable[Row], out: Collector[Row]): Unit = {
+    preProcessing(records, out)
+  }
+
+  /**
+    * Intermediate aggregate Rows, divide window based on the rowtime
+    * (current'rowtime - previous\u2019rowtime > gap), and then merge data (within a unified window)
+    * into an aggregate buffer.
+    *
+    * @param records Intermediate aggregate Rows.
+    * @return PreProcessing intermediate aggregate Row.
+    *
+    */
+  private def preProcessing(records: Iterable[Row], out: Collector[Row]): Unit = {
+
+    var windowStart: java.lang.Long = null
+    var windowEnd: java.lang.Long = null
+    var currentRowTime: java.lang.Long = null
+
+    // reset first accumulator in merge list
+    for (i <- aggregates.indices) {
+      val accumulator = aggregates(i).createAccumulator()
+      accumulatorList(i).set(0, accumulator)
+    }
+
+    val iterator = records.iterator()
+
+    while (iterator.hasNext) {
+      val record = iterator.next()
+      currentRowTime = record.getField(rowTimeFieldPos).asInstanceOf[Long]
+      // initial traversal or opening a new window
+      if (windowEnd == null || (windowEnd != null && (currentRowTime > windowEnd))) {
+
+        // calculate the current window and open a new window.
+        if (windowEnd != null) {
+          // emit the current window's merged data
+          doCollect(out, accumulatorList, windowStart, windowEnd)
+
+          // reset first value of accumulator list
+          for (i <- aggregates.indices) {
+            val accumulator = aggregates(i).createAccumulator()
+            accumulatorList(i).set(0, accumulator)
+          }
+        } else {
+          // set group keys to aggregateBuffer.
+          for (i <- groupingKeys.indices) {
+            aggregateBuffer.setField(i, record.getField(i))
+          }
+        }
+
+        windowStart = record.getField(rowTimeFieldPos).asInstanceOf[Long]
+      }
+
+      for (i <- aggregates.indices) {
+        // insert received accumulator into acc list
+        val newAcc = record.getField(accumStartPos + i).asInstanceOf[Accumulator]
+        accumulatorList(i).set(1, newAcc)
+        // merge acc list
+        val retAcc = aggregates(i).merge(accumulatorList(i))
+        // insert result into acc list
+        accumulatorList(i).set(0, retAcc)
+      }
+
+      // the current rowtime is the last rowtime of the next calculation.
+      windowEnd = currentRowTime + gap
+    }
+    // emit the merged data of the current window.
+    doCollect(out, accumulatorList, windowStart, windowEnd)
+  }
+
+  /**
+    * Emit the merged data of the current window.
+    *
+    * @param out             the collection of the aggregate results
+    * @param accumulatorList an array (indexed by aggregate index) of the accumulator lists for
+    *                        each aggregate
+    * @param windowStart     the window's start attribute value is the min (rowtime)
+    *                        of all rows in the window.
+    * @param windowEnd       the window's end property value is max (rowtime) + gap
+    *                        for all rows in the window.
+    */
+  def doCollect(
+      out: Collector[Row],
+      accumulatorList: Array[JArrayList[Accumulator]],
+      windowStart: Long,
+      windowEnd: Long): Unit = {
+
+    // merge the accumulators into one accumulator
+    for (i <- aggregates.indices) {
+      aggregateBuffer.setField(accumStartPos + i, accumulatorList(i).get(0))
+    }
+
+    // intermediate Row WindowStartPos is rowtime pos.
+    aggregateBuffer.setField(rowTimeFieldPos, windowStart)
+
+    // intermediate Row WindowEndPos is rowtime pos + 1.
+    aggregateBuffer.setField(rowTimeFieldPos + 1, windowEnd)
+
+    out.collect(aggregateBuffer)
+  }
+
+  override def getProducedType: TypeInformation[Row] = {
+    intermediateRowType
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/728c936d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
index 071f0ee..882f4b6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
@@ -42,7 +42,7 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode)
     (1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"),
     (2L, 2, 2d, 2f, new BigDecimal("2"), "Hallo"),
     (3L, 2, 2d, 2f, new BigDecimal("2"), "Hello"),
-    (6L, 3, 3d, 3f, new BigDecimal("3"), "Hello"),
+    (7L, 3, 3d, 3f, new BigDecimal("3"), "Hello"),
     (4L, 5, 5d, 5f, new BigDecimal("5"), "Hello"),
     (16L, 4, 4d, 4f, new BigDecimal("4"), "Hello world"),
     (8L, 3, 3d, 3f, new BigDecimal("3"), "Hello world"))
@@ -152,23 +152,28 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode)
     val expected = "Hallo,1,1970-01-01 00:00:00.002,1970-01-01 00:00:00.009\n" +
       "Hello world,1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.015\n" +
       "Hello world,1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.023\n" +
-      "Hello,3,1970-01-01 00:00:00.003,1970-01-01 00:00:00.013\n" +
+      "Hello,3,1970-01-01 00:00:00.003,1970-01-01 00:00:00.014\n" +
       "Hi,1,1970-01-01 00:00:00.001,1970-01-01 00:00:00.008"
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[UnsupportedOperationException])
-  def testAlldEventTimeSessionGroupWindow(): Unit = {
-    // Non-grouping Session window on event-time are currently not supported
+  @Test
+  def testAllEventTimeSessionGroupWindow(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
     val table = env
       .fromCollection(data)
       .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
-    val windowedTable =table
-      .window(Session withGap 7.milli on 'long as 'w)
+
+    val results =table
+      .window(Session withGap 2.milli on 'long as 'w)
       .groupBy('w)
-      .select('string.count).toDataSet[Row].collect()
+      .select('string.count, 'w.start, 'w.end).toDataSet[Row].collect()
+
+    val expected = "4,1970-01-01 00:00:00.001,1970-01-01 00:00:00.006\n" +
+      "2,1970-01-01 00:00:00.007,1970-01-01 00:00:00.01\n" +
+      "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.018"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test(expected = classOf[ValidationException])