You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/03/08 16:03:23 UTC

[1/2] flink git commit: [FLINK-5047] [table] Add sliding group-windows for batch tables

Repository: flink
Updated Branches:
  refs/heads/master bec818d84 -> 31a57c5a8


http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
index d57f4f7..77ea66e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
@@ -21,17 +21,16 @@ package org.apache.flink.table.runtime.dataset
 import java.math.BigDecimal
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{TableEnvironment, ValidationException}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.types.Row
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 
 import scala.collection.JavaConverters._
 
@@ -197,4 +196,162 @@ class DataSetWindowAggregateITCase(
       .toDataSet[Row]
   }
 
+  // ----------------------------------------------------------------------------------------------
+  // Sliding windows
+  // ----------------------------------------------------------------------------------------------
+
+  @Test(expected = classOf[UnsupportedOperationException])
+  def testAllEventTimeSlidingGroupWindowOverCount(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val table = env
+      .fromCollection(data)
+      .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+
+    // Count sliding group window on event-time are currently not supported
+    table
+      .window(Slide over 2.rows every 2.rows on 'long as 'w)
+      .groupBy('w)
+      .select('int.count)
+      .toDataSet[Row]
+  }
+
+  @Test
+  def testAllEventTimeSlidingGroupWindowOverTime(): Unit = {
+    // please keep this test in sync with the DataStream variant
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table = env
+      .fromCollection(data)
+      .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+
+    val windowedTable = table
+      .window(Slide over 5.milli every 2.milli on 'long as 'w)
+      .groupBy('w)
+      .select('int.count, 'w.start, 'w.end)
+
+    val expected =
+      "1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013\n" +
+      "1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017\n" +
+      "1,1970-01-01 00:00:00.014,1970-01-01 00:00:00.019\n" +
+      "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021\n" +
+      "2,1969-12-31 23:59:59.998,1970-01-01 00:00:00.003\n" +
+      "2,1970-01-01 00:00:00.006,1970-01-01 00:00:00.011\n" +
+      "3,1970-01-01 00:00:00.002,1970-01-01 00:00:00.007\n" +
+      "3,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009\n" +
+      "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005"
+
+    val results = windowedTable.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testEventTimeSlidingGroupWindowOverTimeOverlappingFullPane(): Unit = {
+    // please keep this test in sync with the DataStream variant
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table = env
+      .fromCollection(data)
+      .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+
+    val windowedTable = table
+      .window(Slide over 10.milli every 5.milli on 'long as 'w)
+      .groupBy('string, 'w)
+      .select('string, 'int.count, 'w.start, 'w.end)
+
+    val expected =
+      "Hallo,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005\n" +
+      "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01\n" +
+      "Hello world,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01\n" +
+      "Hello world,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015\n" +
+      "Hello world,1,1970-01-01 00:00:00.01,1970-01-01 00:00:00.02\n" +
+      "Hello world,1,1970-01-01 00:00:00.015,1970-01-01 00:00:00.025\n" +
+      "Hello,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015\n" +
+      "Hello,2,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005\n" +
+      "Hello,3,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01\n" +
+      "Hi,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005\n" +
+      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01"
+
+    val results = windowedTable.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testEventTimeSlidingGroupWindowOverTimeOverlappingSplitPane(): Unit = {
+    // please keep this test in sync with the DataStream variant
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table = env
+      .fromCollection(data)
+      .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+
+    val windowedTable = table
+      .window(Slide over 5.milli every 4.milli on 'long as 'w)
+      .groupBy('string, 'w)
+      .select('string, 'int.count, 'w.start, 'w.end)
+
+    val expected =
+      "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" +
+      "Hello world,1,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009\n" +
+      "Hello world,1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013\n" +
+      "Hello world,1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017\n" +
+      "Hello world,1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021\n" +
+      "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" +
+      "Hello,2,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009\n" +
+      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005"
+
+    val results = windowedTable.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testEventTimeSlidingGroupWindowOverTimeNonOverlappingFullPane(): Unit = {
+    // please keep this test in sync with the DataStream variant
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table = env
+      .fromCollection(data)
+      .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+
+    val windowedTable = table
+      .window(Slide over 5.milli every 10.milli on 'long as 'w)
+      .groupBy('string, 'w)
+      .select('string, 'int.count, 'w.start, 'w.end)
+
+    val expected =
+      "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" +
+      "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" +
+      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005"
+
+    val results = windowedTable.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testEventTimeSlidingGroupWindowOverTimeNonOverlappingSplitPane(): Unit = {
+    // please keep this test in sync with the DataStream variant
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table = env
+      .fromCollection(data)
+      .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+
+    val windowedTable = table
+      .window(Slide over 3.milli every 10.milli on 'long as 'w)
+      .groupBy('string, 'w)
+      .select('string, 'int.count, 'w.start, 'w.end)
+
+    val expected =
+      "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003\n" +
+      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003"
+
+    val results = windowedTable.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
new file mode 100644
index 0000000..85a2373
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.datastream
+
+import java.math.BigDecimal
+
+import org.apache.flink.api.scala._
+import org.apache.flink.types.Row
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase
+import org.apache.flink.table.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.runtime.datastream.DataStreamAggregateITCase.TimestampWithEqualWatermark
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase {
+
+  val data = List(
+    (1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"),
+    (2L, 2, 2d, 2f, new BigDecimal("2"), "Hallo"),
+    (3L, 2, 2d, 2f, new BigDecimal("2"), "Hello"),
+    (4L, 5, 5d, 5f, new BigDecimal("5"), "Hello"),
+    (7L, 3, 3d, 3f, new BigDecimal("3"), "Hello"),
+    (8L, 3, 3d, 3f, new BigDecimal("3"), "Hello world"),
+    (16L, 4, 4d, 4f, new BigDecimal("4"), "Hello world"))
+
+  // ----------------------------------------------------------------------------------------------
+  // Sliding windows
+  // ----------------------------------------------------------------------------------------------
+
+  @Test
+  def testAllEventTimeSlidingGroupWindowOverTime(): Unit = {
+    // please keep this test in sync with the DataSet variant
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    val table = stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+
+    val windowedTable = table
+      .window(Slide over 5.milli every 2.milli on 'rowtime as 'w)
+      .groupBy('w)
+      .select('int.count, 'w.start, 'w.end)
+
+    val results = windowedTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq(
+      "1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013",
+      "1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017",
+      "1,1970-01-01 00:00:00.014,1970-01-01 00:00:00.019",
+      "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021",
+      "2,1969-12-31 23:59:59.998,1970-01-01 00:00:00.003",
+      "2,1970-01-01 00:00:00.006,1970-01-01 00:00:00.011",
+      "3,1970-01-01 00:00:00.002,1970-01-01 00:00:00.007",
+      "3,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009",
+      "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testEventTimeSlidingGroupWindowOverTimeOverlappingFullPane(): Unit = {
+    // please keep this test in sync with the DataSet variant
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    val table = stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+
+    val windowedTable = table
+      .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count, 'w.start, 'w.end)
+
+    val results = windowedTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq(
+      "Hallo,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005",
+      "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01",
+      "Hello world,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01",
+      "Hello world,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015",
+      "Hello world,1,1970-01-01 00:00:00.01,1970-01-01 00:00:00.02",
+      "Hello world,1,1970-01-01 00:00:00.015,1970-01-01 00:00:00.025",
+      "Hello,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015",
+      "Hello,2,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005",
+      "Hello,3,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01",
+      "Hi,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005",
+      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testEventTimeSlidingGroupWindowOverTimeOverlappingSplitPane(): Unit = {
+    // please keep this test in sync with the DataSet variant
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    val table = stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+
+    val windowedTable = table
+      .window(Slide over 5.milli every 4.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count, 'w.start, 'w.end)
+
+    val results = windowedTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq(
+      "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+      "Hello world,1,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009",
+      "Hello world,1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013",
+      "Hello world,1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017",
+      "Hello world,1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021",
+      "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+      "Hello,2,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009",
+      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testEventTimeSlidingGroupWindowOverTimeNonOverlappingFullPane(): Unit = {
+    // please keep this test in sync with the DataSet variant
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    val table = stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+
+    val windowedTable = table
+      .window(Slide over 5.milli every 10.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count, 'w.start, 'w.end)
+
+    val results = windowedTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq(
+      "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+      "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testEventTimeSlidingGroupWindowOverTimeNonOverlappingSplitPane(): Unit = {
+    // please keep this test in sync with the DataSet variant
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    val table = stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+
+    val windowedTable = table
+      .window(Slide over 3.milli every 10.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count, 'w.start, 'w.end)
+
+    val results = windowedTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq(
+      "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003",
+      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+}
+
+object DataStreamAggregateITCase {
+  class TimestampWithEqualWatermark
+  extends AssignerWithPunctuatedWatermarks[(Long, Int, Double, Float, BigDecimal, String)] {
+
+    override def checkAndGetNextWatermark(
+        lastElement: (Long, Int, Double, Float, BigDecimal, String),
+        extractedTimestamp: Long)
+      : Watermark = {
+      new Watermark(extractedTimestamp)
+    }
+
+    override def extractTimestamp(
+        element: (Long, Int, Double, Float, BigDecimal, String),
+        previousElementTimestamp: Long): Long = {
+      element._1
+    }
+  }
+}


[2/2] flink git commit: [FLINK-5047] [table] Add sliding group-windows for batch tables

Posted by tw...@apache.org.
[FLINK-5047] [table] Add sliding group-windows for batch tables

This closes #3364.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/31a57c5a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/31a57c5a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/31a57c5a

Branch: refs/heads/master
Commit: 31a57c5a89d6d22ccb629c2adfe4ffb87441e6dd
Parents: bec818d
Author: twalthr <tw...@apache.org>
Authored: Wed Jan 18 16:56:02 2017 +0100
Committer: twalthr <tw...@apache.org>
Committed: Wed Mar 8 17:01:27 2017 +0100

----------------------------------------------------------------------
 .../table/functions/AggregateFunction.scala     |   8 +-
 .../nodes/dataset/DataSetWindowAggregate.scala  | 117 ++++++++-
 .../table/runtime/aggregate/AggregateUtil.scala | 236 ++++++++++++++++---
 ...SetSessionWindowAggReduceGroupFunction.scala | 201 ++++++++++++++++
 ...sionWindowAggregateReduceGroupFunction.scala | 201 ----------------
 ...taSetSlideTimeWindowAggFlatMapFunction.scala |  63 +++++
 ...tSlideTimeWindowAggReduceGroupFunction.scala | 202 ++++++++++++++++
 ...SetSlideWindowAggReduceCombineFunction.scala | 117 +++++++++
 ...taSetSlideWindowAggReduceGroupFunction.scala | 141 +++++++++++
 ...umbleCountWindowAggReduceGroupFunction.scala |   3 -
 ...TumbleTimeWindowAggReduceGroupFunction.scala |   3 +-
 .../aggregate/DataSetWindowAggMapFunction.scala | 112 +++++++++
 .../DataSetWindowAggregateMapFunction.scala     | 111 ---------
 .../IncrementalAggregateAllWindowFunction.scala |   7 +-
 .../scala/stream/table/AggregationsITCase.scala |  43 +---
 .../dataset/DataSetWindowAggregateITCase.scala  | 163 ++++++++++++-
 .../datastream/DataStreamAggregateITCase.scala  | 235 ++++++++++++++++++
 17 files changed, 1566 insertions(+), 397 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
index 967d2ea..773c71f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
@@ -61,7 +61,7 @@ abstract class AggregateFunction[T] extends UserDefinedFunction {
   def getValue(accumulator: Accumulator): T
 
   /**
-    * Process the input values and update the provided accumulator instance.
+    * Processes the input values and update the provided accumulator instance.
     *
     * @param accumulator the accumulator which contains the current
     *                    aggregated results
@@ -70,9 +70,9 @@ abstract class AggregateFunction[T] extends UserDefinedFunction {
   def accumulate(accumulator: Accumulator, input: Any): Unit
 
   /**
-    * Merge a list of accumulator instances into one accumulator instance.
+    * Merges a list of accumulator instances into one accumulator instance.
     *
-    * IMPORTANT: You may only return a new accumulator instance or the the first accumulator of the
+    * IMPORTANT: You may only return a new accumulator instance or the first accumulator of the
     * input list. If you return another instance, the result of the aggregation function might be
     * incorrect.
     *
@@ -88,7 +88,7 @@ abstract class AggregateFunction[T] extends UserDefinedFunction {
     *
     * @return The type information for the accumulator.
     */
-  def getAccumulatorType(): TypeInformation[_] = null
+  def getAccumulatorType: TypeInformation[_] = null
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
index fb5ff3b..a94deb1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
@@ -111,17 +111,25 @@ class DataSetWindowAggregate(
 
     // whether identifiers are matched case-sensitively
     val caseSensitive = tableEnv.getFrameworkConfig.getParserConfig.caseSensitive()
+
     window match {
       case EventTimeTumblingGroupWindow(_, _, size) =>
         createEventTimeTumblingWindowDataSet(
           inputDS,
           isTimeInterval(size.resultType),
           caseSensitive)
+
       case EventTimeSessionGroupWindow(_, _, gap) =>
         createEventTimeSessionWindowDataSet(inputDS, caseSensitive)
-      case EventTimeSlidingGroupWindow(_, _, _, _) =>
-        throw new UnsupportedOperationException(
-          "Event-time sliding windows in a batch environment are currently not supported")
+
+      case EventTimeSlidingGroupWindow(_, _, size, slide) =>
+        createEventTimeSlidingWindowDataSet(
+          inputDS,
+          isTimeInterval(size.resultType),
+          asLong(size),
+          asLong(slide),
+          caseSensitive)
+
       case _: ProcessingTimeGroupWindow =>
         throw new UnsupportedOperationException(
           "Processing-time tumbling windows are not supported in a batch environment, " +
@@ -130,7 +138,6 @@ class DataSetWindowAggregate(
     }
   }
 
-
   private def createEventTimeTumblingWindowDataSet(
       inputDS: DataSet[Row],
       isTimeWindow: Boolean,
@@ -312,6 +319,108 @@ class DataSetWindowAggregate(
     }
   }
 
+  private def createEventTimeSlidingWindowDataSet(
+      inputDS: DataSet[Row],
+      isTimeWindow: Boolean,
+      size: Long,
+      slide: Long,
+      isParserCaseSensitive: Boolean)
+    : DataSet[Row] = {
+
+    // create MapFunction for initializing the aggregations
+    // it aligns the rowtime for pre-tumbling in case of a time-window for partial aggregates
+    val mapFunction = createDataSetWindowPrepareMapFunction(
+      window,
+      namedAggregates,
+      grouping,
+      inputType,
+      isParserCaseSensitive)
+
+    val mappedDataSet = inputDS
+      .map(mapFunction)
+      .name(prepareOperatorName)
+
+    val mapReturnType = mappedDataSet.getType
+
+    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+    val groupingKeys = grouping.indices.toArray
+
+    // do partial aggregation if possible
+    val isPartial = doAllSupportPartialMerge(
+      namedAggregates.map(_.getKey),
+      inputType,
+      grouping.length)
+
+    // only pre-tumble if it is worth it
+    val isLittleTumblingSize = determineLargestTumblingSize(size, slide) <= 1
+
+    val preparedDataSet = if (isTimeWindow) {
+      // time window
+
+      if (isPartial && !isLittleTumblingSize) {
+        // partial aggregates
+
+        val groupingKeysAndAlignedRowtime = groupingKeys :+ mapReturnType.getArity - 1
+
+        // create GroupReduceFunction
+        // for pre-tumbling and replicating/omitting the content for each pane
+        val prepareReduceFunction = createDataSetSlideWindowPrepareGroupReduceFunction(
+          window,
+          namedAggregates,
+          grouping,
+          inputType,
+          isParserCaseSensitive)
+
+        mappedDataSet.asInstanceOf[DataSet[Row]]
+          .groupBy(groupingKeysAndAlignedRowtime: _*)
+          .reduceGroup(prepareReduceFunction) // pre-tumbles and replicates/omits
+          .name(prepareOperatorName)
+      } else {
+        // non-partial aggregates
+
+        // create FlatMapFunction
+        // for replicating/omitting the content for each pane
+        val prepareFlatMapFunction = createDataSetSlideWindowPrepareFlatMapFunction(
+          window,
+          namedAggregates,
+          grouping,
+          mapReturnType,
+          isParserCaseSensitive)
+
+        mappedDataSet
+          .flatMap(prepareFlatMapFunction) // replicates/omits
+      }
+    } else {
+      // count window
+
+      throw new UnsupportedOperationException(
+          "Count sliding group windows on event-time are currently not supported.")
+    }
+
+    val prepareReduceReturnType = preparedDataSet.getType
+
+    // create GroupReduceFunction for final aggregation and conversion to output row
+    val aggregateReduceFunction = createDataSetWindowAggregationGroupReduceFunction(
+      window,
+      namedAggregates,
+      inputType,
+      rowRelDataType,
+      grouping,
+      namedProperties,
+      isInputCombined = false)
+
+    // gets the window-start position in the intermediate result.
+    val windowStartPos = prepareReduceReturnType.getArity - 1
+
+    val groupingKeysAndWindowStart = groupingKeys :+ windowStartPos
+
+    preparedDataSet
+      .groupBy(groupingKeysAndWindowStart: _*)
+      .reduceGroup(aggregateReduceFunction)
+      .returns(rowTypeInfo)
+      .name(aggregateOperatorName)
+  }
+
   private def prepareOperatorName: String = {
     val aggString = aggregationToString(
       inputType,

http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index acb6cd0..4900b1b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -21,26 +21,26 @@ import java.util
 
 import org.apache.calcite.rel.`type`._
 import org.apache.calcite.rel.core.AggregateCall
-import org.apache.calcite.sql.{SqlAggFunction, SqlKind}
-import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.sql.fun._
-import org.apache.flink.api.common.functions.{GroupCombineFunction, InvalidTypesException, MapFunction, MapPartitionFunction, RichGroupReduceFunction, AggregateFunction => ApiAggregateFunction}
+import org.apache.calcite.sql.{SqlAggFunction, SqlKind}
+import org.apache.flink.api.common.functions.{FlatMapFunction, GroupCombineFunction, InvalidTypesException, MapFunction, MapPartitionFunction, RichGroupReduceFunction, AggregateFunction => ApiAggregateFunction}
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
-import FlinkRelBuilder.NamedWindowProperty
-import org.apache.flink.table.expressions._
-import org.apache.flink.table.plan.logical._
-import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction}
 import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
-import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.table.api.{TableException, Types}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.aggfunctions._
-import org.apache.flink.table.functions.{AggregateFunction => TableAggregateFunction}
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
+import org.apache.flink.table.functions.{AggregateFunction => TableAggregateFunction}
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.typeutils.TypeCheckUtils._
 import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
 import org.apache.flink.types.Row
 
@@ -160,23 +160,37 @@ object AggregateUtil {
         groupings,
         aggregates,
         inputType,
-        Some(Array(Types.LONG)))
+        Some(Array(BasicTypeInfo.LONG_TYPE_INFO)))
 
     val (timeFieldPos, tumbleTimeWindowSize) = window match {
+      case EventTimeTumblingGroupWindow(_, time, size) if isTimeInterval(size.resultType) =>
+        val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive)
+        (timeFieldPos, Some(asLong(size)))
+
       case EventTimeTumblingGroupWindow(_, time, size) =>
         val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive)
-        size match {
-          case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
-            (timeFieldPos, Some(value))
-          case _ => (timeFieldPos, None)
-        }
+        (timeFieldPos, None)
+
       case EventTimeSessionGroupWindow(_, time, _) =>
-        (getTimeFieldPosition(time, inputType, isParserCaseSensitive), None)
+        val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive)
+        (timeFieldPos, None)
+
+      case EventTimeSlidingGroupWindow(_, time, size, slide)
+          if isTimeInterval(time.resultType) && doAllSupportPartialMerge(aggregates) =>
+        // pre-tumble incremental aggregates on time-windows
+        val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive)
+        val preTumblingSize = determineLargestTumblingSize(asLong(size), asLong(slide))
+        (timeFieldPos, Some(preTumblingSize))
+
+      case EventTimeSlidingGroupWindow(_, time, _, _) =>
+        val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive)
+        (timeFieldPos, None)
+
       case _ =>
         throw new UnsupportedOperationException(s"$window is currently not supported on batch")
     }
 
-    new DataSetWindowAggregateMapFunction(
+    new DataSetWindowAggMapFunction(
       aggregates,
       aggFieldIndexes,
       groupings,
@@ -186,6 +200,116 @@ object AggregateUtil {
   }
 
   /**
+    * Create a [[org.apache.flink.api.common.functions.GroupReduceFunction]] that prepares for
+    * partial aggregates of sliding windows (time and count-windows).
+    * It requires a prepared input (with intermediate aggregate fields and aligned rowtime for
+    * pre-tumbling in case of time-windows), pre-aggregates (pre-tumbles) rows, aligns the
+    * window-start, and replicates or omits records for different panes of a sliding window.
+    *
+    * The output of the function contains the grouping keys, the intermediate aggregate values of
+    * all aggregate function and the aligned window start. Window start must not be a timestamp,
+    * but can also be a count value for count-windows.
+    *
+    * The output is stored in Row by the following format:
+    *
+    * {{{
+    *                      avg(x) aggOffsetInRow = 2      count(z) aggOffsetInRow = 5
+    *                            |                          |
+    *                            v                          v
+    *        +---------+---------+--------+--------+--------+--------+-------------+
+    *        |groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 | windowStart |
+    *        +---------+---------+--------+--------+--------+--------+-------------+
+    *                                              ^                 ^
+    *                                              |                 |
+    *                                 sum(y) aggOffsetInRow = 4    window start for pane mapping
+    * }}}
+    *
+    * NOTE: this function is only used for sliding windows with partial aggregates on batch tables.
+    */
+  def createDataSetSlideWindowPrepareGroupReduceFunction(
+      window: LogicalWindow,
+      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+      groupings: Array[Int],
+      inputType: RelDataType,
+      isParserCaseSensitive: Boolean)
+    : RichGroupReduceFunction[Row, Row] = {
+
+    val aggregates = transformToAggregateFunctions(
+      namedAggregates.map(_.getKey),
+      inputType,
+      needRetraction = false)._2
+
+    val returnType: RowTypeInfo = createDataSetAggregateBufferDataType(
+      groupings,
+      aggregates,
+      inputType,
+      Some(Array(BasicTypeInfo.LONG_TYPE_INFO)))
+
+    window match {
+      case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
+        // sliding time-window for partial aggregations
+        new DataSetSlideTimeWindowAggReduceGroupFunction(
+          aggregates,
+          groupings.length,
+          returnType.getArity - 1,
+          asLong(size),
+          asLong(slide),
+          returnType)
+
+      case _ =>
+        throw new UnsupportedOperationException(s"$window is currently not supported on batch.")
+    }
+  }
+
+  /**
+    * Create a [[org.apache.flink.api.common.functions.FlatMapFunction]] that prepares for
+    * non-incremental aggregates of sliding windows (time-windows).
+    *
+    * It requires a prepared input (with intermediate aggregate fields), aligns the
+    * window-start, and replicates or omits records for different panes of a sliding window.
+    *
+    * The output of the function contains the grouping keys, the intermediate aggregate values of
+    * all aggregate function and the aligned window start.
+    *
+    * The output is stored in Row by the following format:
+    *
+    * {{{
+    *                      avg(x) aggOffsetInRow = 2      count(z) aggOffsetInRow = 5
+    *                            |                          |
+    *                            v                          v
+    *        +---------+---------+--------+--------+--------+--------+-------------+
+    *        |groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 | windowStart |
+    *        +---------+---------+--------+--------+--------+--------+-------------+
+    *                                              ^                 ^
+    *                                              |                 |
+    *                                 sum(y) aggOffsetInRow = 4      window start for pane mapping
+    * }}}
+    *
+    * NOTE: this function is only used for time-based sliding windows on batch tables.
+    */
+  def createDataSetSlideWindowPrepareFlatMapFunction(
+      window: LogicalWindow,
+      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+      groupings: Array[Int],
+      inputType: TypeInformation[Row],
+      isParserCaseSensitive: Boolean)
+    : FlatMapFunction[Row, Row] = {
+
+    window match {
+      case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
+        new DataSetSlideTimeWindowAggFlatMapFunction(
+          inputType.getArity - 1,
+          asLong(size),
+          asLong(slide),
+          inputType)
+
+      case _ =>
+        throw new UnsupportedOperationException(
+          s"$window is currently not supported in a batch environment.")
+    }
+  }
+
+  /**
     * Create a [[org.apache.flink.api.common.functions.GroupReduceFunction]] to compute window
     * aggregates on batch tables. If all aggregates support partial aggregation and is a time
     * window, the [[org.apache.flink.api.common.functions.GroupReduceFunction]] implements
@@ -203,10 +327,10 @@ object AggregateUtil {
       isInputCombined: Boolean = false)
     : RichGroupReduceFunction[Row, Row] = {
 
-    val aggregates = transformToAggregateFunctions(
+    val (aggFieldIndexes, aggregates) = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),
       inputType,
-      needRetraction = false)._2
+      needRetraction = false)
 
     // the mapping relation between field index of intermediate aggregate Row and output Row.
     val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings)
@@ -259,7 +383,7 @@ object AggregateUtil {
 
       case EventTimeSessionGroupWindow(_, _, gap) =>
         val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
-        new DataSetSessionWindowAggregateReduceGroupFunction(
+        new DataSetSessionWindowAggReduceGroupFunction(
           aggregates,
           groupingOffsetMapping,
           aggOffsetMapping,
@@ -268,6 +392,42 @@ object AggregateUtil {
           endPos,
           asLong(gap),
           isInputCombined)
+
+      case EventTimeSlidingGroupWindow(_, _, size, _) if isTimeInterval(size.resultType) =>
+        val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+        if (doAllSupportPartialMerge(aggregates)) {
+          // for partial aggregations
+          new DataSetSlideWindowAggReduceCombineFunction(
+            aggregates,
+            groupingOffsetMapping,
+            aggOffsetMapping,
+            outputType.getFieldCount,
+            startPos,
+            endPos,
+            asLong(size))
+        }
+        else {
+          // for non-partial aggregations
+          new DataSetSlideWindowAggReduceGroupFunction(
+            aggregates,
+            groupingOffsetMapping,
+            aggOffsetMapping,
+            outputType.getFieldCount,
+            startPos,
+            endPos,
+            asLong(size))
+        }
+
+      case EventTimeSlidingGroupWindow(_, _, size, _) =>
+        new DataSetSlideWindowAggReduceGroupFunction(
+            aggregates,
+            groupingOffsetMapping,
+            aggOffsetMapping,
+            outputType.getFieldCount,
+            None,
+            None,
+            asLong(size))
+
       case _ =>
         throw new UnsupportedOperationException(s"$window is currently not supported on batch")
     }
@@ -355,6 +515,7 @@ object AggregateUtil {
       needRetraction = false)._2
 
     window match {
+
       case EventTimeSessionGroupWindow(_, _, gap) =>
         val combineReturnType: RowTypeInfo =
           createDataSetAggregateBufferDataType(
@@ -368,6 +529,7 @@ object AggregateUtil {
           groupings,
           asLong(gap),
           combineReturnType)
+
       case _ =>
         throw new UnsupportedOperationException(
           s" [ ${window.getClass.getCanonicalName.split("\\.").last} ] is currently not " +
@@ -662,7 +824,8 @@ object AggregateUtil {
       }
       val sqlTypeName = inputType.getFieldList.get(aggFieldIndexes(index)).getType.getSqlTypeName
       aggregateCall.getAggregation match {
-        case _: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction => {
+
+        case _: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction =>
           if (needRetraction) {
             aggregates(index) = sqlTypeName match {
               case TINYINT =>
@@ -702,8 +865,8 @@ object AggregateUtil {
                 throw new TableException("Sum aggregate does no support type:" + sqlType)
             }
           }
-        }
-        case _: SqlAvgAggFunction => {
+
+        case _: SqlAvgAggFunction =>
           aggregates(index) = sqlTypeName match {
             case TINYINT =>
               new ByteAvgAggFunction
@@ -722,8 +885,8 @@ object AggregateUtil {
             case sqlType: SqlTypeName =>
               throw new TableException("Avg aggregate does no support type:" + sqlType)
           }
-        }
-        case sqlMinMaxFunction: SqlMinMaxAggFunction => {
+
+        case sqlMinMaxFunction: SqlMinMaxAggFunction =>
           aggregates(index) = if (sqlMinMaxFunction.getKind == SqlKind.MIN) {
             if (needRetraction) {
               sqlTypeName match {
@@ -815,9 +978,10 @@ object AggregateUtil {
               }
             }
           }
-        }
+
         case _: SqlCountAggFunction =>
           aggregates(index) = new CountAggFunction
+
         case unSupported: SqlAggFunction =>
           throw new TableException("unsupported Function: " + unSupported.getName)
       }
@@ -833,7 +997,7 @@ object AggregateUtil {
     val aggTypes: Seq[TypeInformation[_]] =
       aggregates.map {
         agg =>
-          val accType = agg.getAccumulatorType()
+          val accType = agg.getAccumulatorType
           if (accType != null) {
             accType
           } else {
@@ -969,10 +1133,22 @@ object AggregateUtil {
     }
   }
 
-  private def asLong(expr: Expression): Long = expr match {
+  private[flink] def asLong(expr: Expression): Long = expr match {
     case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) => value
     case Literal(value: Long, RowIntervalTypeInfo.INTERVAL_ROWS) => value
     case _ => throw new IllegalArgumentException()
   }
+
+  private[flink] def determineLargestTumblingSize(size: Long, slide: Long): Long = {
+    if (slide > size) {
+      gcd(slide, size)
+    } else {
+      gcd(size, slide)
+    }
+  }
+
+  private def gcd(a: Long, b: Long): Long = {
+    if (b == 0) a else gcd(b, a % b)
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
new file mode 100644
index 0000000..1f19687
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+import java.util.{ArrayList => JArrayList}
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+  * It wraps the aggregate logic inside of
+  * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. It is used for Session time-window
+  * on batch.
+  *
+  * Note:
+  *
+  * This can handle two input types (depending if input is combined or not):
+  *
+  *  1. when partial aggregate is not supported, the input data structure of reduce is
+  * |groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime|
+  *  2. when partial aggregate is supported, the input data structure of reduce is
+  * |groupKey1|groupKey2|sum1|count1|sum2|count2|windowStart|windowEnd|
+  *
+  * @param aggregates             The aggregate functions.
+  * @param groupKeysMapping       The index mapping of group keys between intermediate aggregate Row
+  *                               and output Row.
+  * @param aggregateMapping       The index mapping between aggregate function list and
+  *                               aggregated value index in output Row.
+  * @param finalRowArity          The output row field count.
+  * @param finalRowWindowStartPos The relative window-start field position.
+  * @param finalRowWindowEndPos   The relative window-end field position.
+  * @param gap                    Session time window gap.
+  */
+class DataSetSessionWindowAggReduceGroupFunction(
+    aggregates: Array[AggregateFunction[_ <: Any]],
+    groupKeysMapping: Array[(Int, Int)],
+    aggregateMapping: Array[(Int, Int)],
+    finalRowArity: Int,
+    finalRowWindowStartPos: Option[Int],
+    finalRowWindowEndPos: Option[Int],
+    gap: Long,
+    isInputCombined: Boolean)
+  extends RichGroupReduceFunction[Row, Row] {
+
+  private var aggregateBuffer: Row = _
+  private var output: Row = _
+  private var collector: TimeWindowPropertyCollector = _
+  private val accumStartPos: Int = groupKeysMapping.length
+  private val intermediateRowArity: Int = accumStartPos + aggregates.length + 2
+  private val intermediateRowWindowStartPos = intermediateRowArity - 2
+  private val intermediateRowWindowEndPos = intermediateRowArity - 1
+
+  val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) {
+    new JArrayList[Accumulator](2)
+  }
+
+  override def open(config: Configuration) {
+    Preconditions.checkNotNull(aggregates)
+    Preconditions.checkNotNull(groupKeysMapping)
+    aggregateBuffer = new Row(intermediateRowArity)
+    output = new Row(finalRowArity)
+    collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos)
+
+    // init lists with two empty accumulators
+    for (i <- aggregates.indices) {
+      val accumulator = aggregates(i).createAccumulator()
+      accumulatorList(i).add(accumulator)
+      accumulatorList(i).add(accumulator)
+    }
+  }
+
+  /**
+    * For grouped intermediate aggregate Rows, divide window according to the window-start
+    * and window-end, merge data (within a unified window) into an aggregate buffer, calculate
+    * aggregated values output from aggregate buffer, and then set them into output
+    * Row based on the mapping relationship between intermediate aggregate data and output data.
+    *
+    * @param records Grouped intermediate aggregate Rows iterator.
+    * @param out     The collector to hand results to.
+    *
+    */
+  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
+
+    var windowStart: java.lang.Long = null
+    var windowEnd: java.lang.Long = null
+    var currentRowTime: java.lang.Long = null
+
+    // reset first accumulator in merge list
+    for (i <- aggregates.indices) {
+      val accumulator = aggregates(i).createAccumulator()
+      accumulatorList(i).set(0, accumulator)
+    }
+
+    val iterator = records.iterator()
+
+    while (iterator.hasNext) {
+      val record = iterator.next()
+      currentRowTime = record.getField(intermediateRowWindowStartPos).asInstanceOf[Long]
+      // initial traversal or opening a new window
+      if (null == windowEnd ||
+        (null != windowEnd && currentRowTime > windowEnd)) {
+
+        // calculate the current window and open a new window
+        if (null != windowEnd) {
+          // evaluate and emit the current window's result.
+          doEvaluateAndCollect(out, accumulatorList, windowStart, windowEnd)
+
+          // reset first accumulator in list
+          for (i <- aggregates.indices) {
+            val accumulator = aggregates(i).createAccumulator()
+            accumulatorList(i).set(0, accumulator)
+          }
+        } else {
+          // set group keys value to final output.
+          groupKeysMapping.foreach {
+            case (after, previous) =>
+              output.setField(after, record.getField(previous))
+          }
+        }
+
+        windowStart = record.getField(intermediateRowWindowStartPos).asInstanceOf[Long]
+      }
+
+      for (i <- aggregates.indices) {
+        // insert received accumulator into acc list
+        val newAcc = record.getField(accumStartPos + i).asInstanceOf[Accumulator]
+        accumulatorList(i).set(1, newAcc)
+        // merge acc list
+        val retAcc = aggregates(i).merge(accumulatorList(i))
+        // insert result into acc list
+        accumulatorList(i).set(0, retAcc)
+      }
+
+      windowEnd = if (isInputCombined) {
+        // partial aggregate is supported
+        record.getField(intermediateRowWindowEndPos).asInstanceOf[Long]
+      } else {
+        // partial aggregate is not supported, window-start equal rowtime + gap
+        currentRowTime + gap
+      }
+    }
+    // evaluate and emit the current window's result.
+    doEvaluateAndCollect(out, accumulatorList, windowStart, windowEnd)
+  }
+
+  /**
+    * Evaluate and emit the data of the current window.
+    *
+    * @param out             the collection of the aggregate results
+    * @param accumulatorList an array (indexed by aggregate index) of the accumulator lists for
+    *                        each aggregate
+    * @param windowStart     the window's start attribute value is the min (rowtime) of all rows
+    *                        in the window.
+    * @param windowEnd       the window's end property value is max (rowtime) + gap for all rows
+    *                        in the window.
+    */
+  def doEvaluateAndCollect(
+      out: Collector[Row],
+      accumulatorList: Array[JArrayList[Accumulator]],
+      windowStart: Long,
+      windowEnd: Long): Unit = {
+
+    // merge the accumulators and then get value for the final output
+    aggregateMapping.foreach {
+      case (after, previous) =>
+        val agg = aggregates(previous)
+        output.setField(after, agg.getValue(accumulatorList(previous).get(0)))
+    }
+
+    // adds TimeWindow properties to output then emit output
+    if (finalRowWindowStartPos.isDefined || finalRowWindowEndPos.isDefined) {
+      collector.wrappedCollector = out
+      collector.windowStart = windowStart
+      collector.windowEnd = windowEnd
+
+      collector.collect(output)
+    } else {
+      out.collect(output)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala
deleted file mode 100644
index ebef211..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.aggregate
-
-import java.lang.Iterable
-import java.util.{ArrayList => JArrayList}
-
-import org.apache.flink.api.common.functions.RichGroupReduceFunction
-import org.apache.flink.types.Row
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
-import org.apache.flink.util.{Collector, Preconditions}
-
-/**
-  * It wraps the aggregate logic inside of
-  * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. It is used for Session time-window
-  * on batch.
-  *
-  * Note:
-  *
-  * This can handle two input types (depending if input is combined or not):
-  *
-  *  1. when partial aggregate is not supported, the input data structure of reduce is
-  * |groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime|
-  *  2. when partial aggregate is supported, the input data structure of reduce is
-  * |groupKey1|groupKey2|sum1|count1|sum2|count2|windowStart|windowEnd|
-  *
-  * @param aggregates             The aggregate functions.
-  * @param groupKeysMapping       The index mapping of group keys between intermediate aggregate Row
-  *                               and output Row.
-  * @param aggregateMapping       The index mapping between aggregate function list and
-  *                               aggregated value index in output Row.
-  * @param finalRowArity          The output row field count.
-  * @param finalRowWindowStartPos The relative window-start field position.
-  * @param finalRowWindowEndPos   The relative window-end field position.
-  * @param gap                    Session time window gap.
-  */
-class DataSetSessionWindowAggregateReduceGroupFunction(
-    aggregates: Array[AggregateFunction[_ <: Any]],
-    groupKeysMapping: Array[(Int, Int)],
-    aggregateMapping: Array[(Int, Int)],
-    finalRowArity: Int,
-    finalRowWindowStartPos: Option[Int],
-    finalRowWindowEndPos: Option[Int],
-    gap: Long,
-    isInputCombined: Boolean)
-  extends RichGroupReduceFunction[Row, Row] {
-
-  private var aggregateBuffer: Row = _
-  private var output: Row = _
-  private var collector: TimeWindowPropertyCollector = _
-  private val accumStartPos: Int = groupKeysMapping.length
-  private val intermediateRowArity: Int = accumStartPos + aggregates.length + 2
-  private val intermediateRowWindowStartPos = intermediateRowArity - 2
-  private val intermediateRowWindowEndPos = intermediateRowArity - 1
-
-  val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) {
-    new JArrayList[Accumulator](2)
-  }
-
-  override def open(config: Configuration) {
-    Preconditions.checkNotNull(aggregates)
-    Preconditions.checkNotNull(groupKeysMapping)
-    aggregateBuffer = new Row(intermediateRowArity)
-    output = new Row(finalRowArity)
-    collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos)
-
-    // init lists with two empty accumulators
-    for (i <- aggregates.indices) {
-      val accumulator = aggregates(i).createAccumulator()
-      accumulatorList(i).add(accumulator)
-      accumulatorList(i).add(accumulator)
-    }
-  }
-
-  /**
-    * For grouped intermediate aggregate Rows, divide window according to the window-start
-    * and window-end, merge data (within a unified window) into an aggregate buffer, calculate
-    * aggregated values output from aggregate buffer, and then set them into output
-    * Row based on the mapping relationship between intermediate aggregate data and output data.
-    *
-    * @param records Grouped intermediate aggregate Rows iterator.
-    * @param out     The collector to hand results to.
-    *
-    */
-  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
-
-    var windowStart: java.lang.Long = null
-    var windowEnd: java.lang.Long = null
-    var currentRowTime: java.lang.Long = null
-
-    // reset first accumulator in merge list
-    for (i <- aggregates.indices) {
-      val accumulator = aggregates(i).createAccumulator()
-      accumulatorList(i).set(0, accumulator)
-    }
-
-    val iterator = records.iterator()
-
-    while (iterator.hasNext) {
-      val record = iterator.next()
-      currentRowTime = record.getField(intermediateRowWindowStartPos).asInstanceOf[Long]
-      // initial traversal or opening a new window
-      if (null == windowEnd ||
-        (null != windowEnd && currentRowTime > windowEnd)) {
-
-        // calculate the current window and open a new window
-        if (null != windowEnd) {
-          // evaluate and emit the current window's result.
-          doEvaluateAndCollect(out, accumulatorList, windowStart, windowEnd)
-
-          // reset first accumulator in list
-          for (i <- aggregates.indices) {
-            val accumulator = aggregates(i).createAccumulator()
-            accumulatorList(i).set(0, accumulator)
-          }
-        } else {
-          // set group keys value to final output.
-          groupKeysMapping.foreach {
-            case (after, previous) =>
-              output.setField(after, record.getField(previous))
-          }
-        }
-
-        windowStart = record.getField(intermediateRowWindowStartPos).asInstanceOf[Long]
-      }
-
-      for (i <- aggregates.indices) {
-        // insert received accumulator into acc list
-        val newAcc = record.getField(accumStartPos + i).asInstanceOf[Accumulator]
-        accumulatorList(i).set(1, newAcc)
-        // merge acc list
-        val retAcc = aggregates(i).merge(accumulatorList(i))
-        // insert result into acc list
-        accumulatorList(i).set(0, retAcc)
-      }
-
-      windowEnd = if (isInputCombined) {
-        // partial aggregate is supported
-        record.getField(intermediateRowWindowEndPos).asInstanceOf[Long]
-      } else {
-        // partial aggregate is not supported, window-start equal rowtime + gap
-        currentRowTime + gap
-      }
-    }
-    // evaluate and emit the current window's result.
-    doEvaluateAndCollect(out, accumulatorList, windowStart, windowEnd)
-  }
-
-  /**
-    * Evaluate and emit the data of the current window.
-    *
-    * @param out             the collection of the aggregate results
-    * @param accumulatorList an array (indexed by aggregate index) of the accumulator lists for
-    *                        each aggregate
-    * @param windowStart     the window's start attribute value is the min (rowtime) of all rows
-    *                        in the window.
-    * @param windowEnd       the window's end property value is max (rowtime) + gap for all rows
-    *                        in the window.
-    */
-  def doEvaluateAndCollect(
-      out: Collector[Row],
-      accumulatorList: Array[JArrayList[Accumulator]],
-      windowStart: Long,
-      windowEnd: Long): Unit = {
-
-    // merge the accumulators and then get value for the final output
-    aggregateMapping.foreach {
-      case (after, previous) =>
-        val agg = aggregates(previous)
-        output.setField(after, agg.getValue(accumulatorList(previous).get(0)))
-    }
-
-    // adds TimeWindow properties to output then emit output
-    if (finalRowWindowStartPos.isDefined || finalRowWindowEndPos.isDefined) {
-      collector.wrappedCollector = out
-      collector.windowStart = windowStart
-      collector.windowEnd = windowEnd
-
-      collector.collect(output)
-    } else {
-      out.collect(output)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala
new file mode 100644
index 0000000..5f37b8a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+
+/**
+  * It is used for sliding windows on batch for time-windows. It takes a prepared input row,
+  * aligns the window start, and replicates or omits records for different panes of a sliding
+  * window. It is used for non-partial aggregations.
+  *
+  * @param windowSize window size of the sliding window
+  * @param windowSlide window slide of the sliding window
+  * @param returnType return type of this function
+  */
+class DataSetSlideTimeWindowAggFlatMapFunction(
+    private val timeFieldPos: Int,
+    private val windowSize: Long,
+    private val windowSlide: Long,
+    @transient private val returnType: TypeInformation[Row])
+  extends RichFlatMapFunction[Row, Row]
+  with ResultTypeQueryable[Row] {
+
+  override def flatMap(record: Row, out: Collector[Row]): Unit = {
+    val windowStart = record.getField(timeFieldPos).asInstanceOf[Long]
+
+    // adopted from SlidingEventTimeWindows.assignWindows
+    var start: Long = TimeWindow.getWindowStartWithOffset(windowStart, 0, windowSlide)
+
+    // adopted from SlidingEventTimeWindows.assignWindows
+    while (start > windowStart - windowSize) {
+      record.setField(timeFieldPos, start)
+      out.collect(record)
+      start -= windowSlide
+    }
+  }
+
+  override def getProducedType: TypeInformation[Row] = {
+    returnType
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
new file mode 100644
index 0000000..5db3acb
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+import java.util.{ArrayList => JArrayList}
+
+import org.apache.flink.api.common.functions.{CombineFunction, RichGroupReduceFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+  * It is used for sliding windows on batch for time-windows. It takes a prepared input row (with
+  * aligned rowtime for pre-tumbling), pre-aggregates (pre-tumbles) rows, aligns the window start,
+  * and replicates or omits records for different panes of a sliding window.
+  *
+  * This function is similar to [[DataSetTumbleCountWindowAggReduceGroupFunction]], however,
+  * it does no final aggregate evaluation. It also includes the logic of
+  * [[DataSetSlideTimeWindowAggFlatMapFunction]].
+  *
+  * @param aggregates aggregate functions
+  * @param groupingKeysLength number of grouping keys
+  * @param timeFieldPos position of aligned time field
+  * @param windowSize window size of the sliding window
+  * @param windowSlide window slide of the sliding window
+  * @param returnType return type of this function
+  */
+class DataSetSlideTimeWindowAggReduceGroupFunction(
+    private val aggregates: Array[AggregateFunction[_ <: Any]],
+    private val groupingKeysLength: Int,
+    private val timeFieldPos: Int,
+    private val windowSize: Long,
+    private val windowSlide: Long,
+    @transient private val returnType: TypeInformation[Row])
+  extends RichGroupReduceFunction[Row, Row]
+  with CombineFunction[Row, Row]
+  with ResultTypeQueryable[Row] {
+
+  Preconditions.checkNotNull(aggregates)
+
+  protected var intermediateRow: Row = _
+  // add one field to store window start
+  protected val intermediateRowArity: Int = groupingKeysLength + aggregates.length + 1
+  protected val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) {
+    new JArrayList[Accumulator](2)
+  }
+  private val intermediateWindowStartPos: Int = intermediateRowArity - 1
+
+  override def open(config: Configuration) {
+    intermediateRow = new Row(intermediateRowArity)
+
+    // init lists with two empty accumulators
+    var i = 0
+    while (i < aggregates.length) {
+      val accumulator = aggregates(i).createAccumulator()
+      accumulatorList(i).add(accumulator)
+      accumulatorList(i).add(accumulator)
+      i += 1
+    }
+  }
+
+  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
+
+    // reset first accumulator
+    var i = 0
+    while (i < aggregates.length) {
+      val accumulator = aggregates(i).createAccumulator()
+      accumulatorList(i).set(0, accumulator)
+      i += 1
+    }
+
+    val iterator = records.iterator()
+
+    while (iterator.hasNext) {
+      val record = iterator.next()
+
+      // accumulate
+      i = 0
+      while (i < aggregates.length) {
+        // insert received accumulator into acc list
+        val newAcc = record.getField(groupingKeysLength + i).asInstanceOf[Accumulator]
+        accumulatorList(i).set(1, newAcc)
+        // merge acc list
+        val retAcc = aggregates(i).merge(accumulatorList(i))
+        // insert result into acc list
+        accumulatorList(i).set(0, retAcc)
+        i += 1
+      }
+
+      // trigger tumbling evaluation
+      if (!iterator.hasNext) {
+        val windowStart = record.getField(timeFieldPos).asInstanceOf[Long]
+
+        // adopted from SlidingEventTimeWindows.assignWindows
+        var start: Long = TimeWindow.getWindowStartWithOffset(windowStart, 0, windowSlide)
+
+        // skip preparing output if it is not necessary
+        if (start > windowStart - windowSize) {
+
+          // set group keys
+          i = 0
+          while (i < groupingKeysLength) {
+            intermediateRow.setField(i, record.getField(i))
+            i += 1
+          }
+
+          // set accumulators
+          i = 0
+          while (i < aggregates.length) {
+            intermediateRow.setField(groupingKeysLength + i, accumulatorList(i).get(0))
+            i += 1
+          }
+
+          // adopted from SlidingEventTimeWindows.assignWindows
+          while (start > windowStart - windowSize) {
+            intermediateRow.setField(intermediateWindowStartPos, start)
+            out.collect(intermediateRow)
+            start -= windowSlide
+          }
+        }
+      }
+    }
+  }
+
+  override def combine(records: Iterable[Row]): Row = {
+
+    // reset first accumulator
+    var i = 0
+    while (i < aggregates.length) {
+      val accumulator = aggregates(i).createAccumulator()
+      accumulatorList(i).set(0, accumulator)
+      i += 1
+    }
+
+    val iterator = records.iterator()
+    while (iterator.hasNext) {
+      val record = iterator.next()
+
+      i = 0
+      while (i < aggregates.length) {
+        // insert received accumulator into acc list
+        val newAcc = record.getField(groupingKeysLength + i).asInstanceOf[Accumulator]
+        accumulatorList(i).set(1, newAcc)
+        // merge acc list
+        val retAcc = aggregates(i).merge(accumulatorList(i))
+        // insert result into acc list
+        accumulatorList(i).set(0, retAcc)
+        i += 1
+      }
+
+      // check if this record is the last record
+      if (!iterator.hasNext) {
+
+        // set group keys
+        i = 0
+        while (i < groupingKeysLength) {
+          intermediateRow.setField(i, record.getField(i))
+          i += 1
+        }
+
+        // set accumulators
+        i = 0
+        while (i < aggregates.length) {
+          intermediateRow.setField(groupingKeysLength + i, accumulatorList(i).get(0))
+          i += 1
+        }
+
+        intermediateRow.setField(timeFieldPos, record.getField(timeFieldPos))
+
+        return intermediateRow
+      }
+    }
+
+    // this code path should never be reached as we return before the loop finishes
+    // we need this to prevent a compiler error
+    throw new IllegalArgumentException("Group is empty. This should never happen.")
+  }
+
+  override def getProducedType: TypeInformation[Row] = {
+    returnType
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
new file mode 100644
index 0000000..c11e86b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.CombineFunction
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.types.Row
+
+/**
+  * Wraps the aggregate logic inside of
+  * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and
+  * [[org.apache.flink.api.java.operators.GroupCombineOperator]].
+  *
+  * It is used for sliding on batch for both time and count-windows.
+  *
+  * @param aggregates aggregate functions.
+  * @param groupKeysMapping index mapping of group keys between intermediate aggregate Row
+  *                         and output Row.
+  * @param aggregateMapping index mapping between aggregate function list and aggregated value
+  *                         index in output Row.
+  * @param finalRowArity output row field count
+  * @param finalRowWindowStartPos relative window-start position to last field of output row
+  * @param finalRowWindowEndPos relative window-end position to last field of output row
+  * @param windowSize size of the window, used to determine window-end for output row
+  */
+class DataSetSlideWindowAggReduceCombineFunction(
+    aggregates: Array[AggregateFunction[_ <: Any]],
+    groupKeysMapping: Array[(Int, Int)],
+    aggregateMapping: Array[(Int, Int)],
+    finalRowArity: Int,
+    finalRowWindowStartPos: Option[Int],
+    finalRowWindowEndPos: Option[Int],
+    windowSize: Long)
+  extends DataSetSlideWindowAggReduceGroupFunction(
+    aggregates,
+    groupKeysMapping,
+    aggregateMapping,
+    finalRowArity,
+    finalRowWindowStartPos,
+    finalRowWindowEndPos,
+    windowSize)
+  with CombineFunction[Row, Row] {
+
+  private val intermediateRowArity: Int = groupKeysMapping.length + aggregateMapping.length + 1
+  private val intermediateRow: Row = new Row(intermediateRowArity)
+
+  override def combine(records: Iterable[Row]): Row = {
+
+    // reset first accumulator
+    var i = 0
+    while (i < aggregates.length) {
+      val accumulator = aggregates(i).createAccumulator()
+      accumulatorList(i).set(0, accumulator)
+      i += 1
+    }
+
+    val iterator = records.iterator()
+    while (iterator.hasNext) {
+      val record = iterator.next()
+
+      // accumulate
+      i = 0
+      while (i < aggregates.length) {
+        // insert received accumulator into acc list
+        val newAcc = record.getField(groupKeysMapping.length + i).asInstanceOf[Accumulator]
+        accumulatorList(i).set(1, newAcc)
+        // merge acc list
+        val retAcc = aggregates(i).merge(accumulatorList(i))
+        // insert result into acc list
+        accumulatorList(i).set(0, retAcc)
+        i += 1
+      }
+
+      // check if this record is the last record
+      if (!iterator.hasNext) {
+        // set group keys
+        i = 0
+        while (i < groupKeysMapping.length) {
+          intermediateRow.setField(i, record.getField(i))
+          i += 1
+        }
+
+        // set the partial accumulated result
+        i = 0
+        while (i < aggregates.length) {
+          intermediateRow.setField(groupKeysMapping.length + i, accumulatorList(i).get(0))
+          i += 1
+        }
+
+        intermediateRow.setField(windowStartPos, record.getField(windowStartPos))
+
+        return intermediateRow
+      }
+    }
+
+    // this code path should never be reached as we return before the loop finishes
+    // we need this to prevent a compiler error
+    throw new IllegalArgumentException("Group is empty. This should never happen.")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
new file mode 100644
index 0000000..e67fac0
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+import java.util.{ArrayList => JArrayList}
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+  * It wraps the aggregate logic inside of
+  * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
+  *
+  * It is used for sliding on batch for both time and count-windows.
+  *
+  * @param aggregates aggregate functions.
+  * @param groupKeysMapping index mapping of group keys between intermediate aggregate Row
+  *                         and output Row.
+  * @param aggregateMapping index mapping between aggregate function list and aggregated value
+  *                         index in output Row.
+  * @param finalRowArity output row field count
+  * @param finalRowWindowStartPos relative window-start position to last field of output row
+  * @param finalRowWindowEndPos relative window-end position to last field of output row
+  * @param windowSize size of the window, used to determine window-end for output row
+  */
+class DataSetSlideWindowAggReduceGroupFunction(
+    aggregates: Array[AggregateFunction[_ <: Any]],
+    groupKeysMapping: Array[(Int, Int)],
+    aggregateMapping: Array[(Int, Int)],
+    finalRowArity: Int,
+    finalRowWindowStartPos: Option[Int],
+    finalRowWindowEndPos: Option[Int],
+    windowSize: Long)
+  extends RichGroupReduceFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupKeysMapping)
+
+  private var collector: TimeWindowPropertyCollector = _
+  private var output: Row = _
+  private val accumulatorStartPos: Int = groupKeysMapping.length
+  protected val windowStartPos: Int = accumulatorStartPos + aggregates.length
+
+  val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) {
+    new JArrayList[Accumulator](2)
+  }
+
+  override def open(config: Configuration) {
+    output = new Row(finalRowArity)
+    collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos)
+
+    // init lists with two empty accumulators
+    var i = 0
+    while (i < aggregates.length) {
+      val accumulator = aggregates(i).createAccumulator()
+      accumulatorList(i).add(accumulator)
+      accumulatorList(i).add(accumulator)
+      i += 1
+    }
+  }
+
+  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
+
+    // reset first accumulator
+    var i = 0
+    while (i < aggregates.length) {
+      val accumulator = aggregates(i).createAccumulator()
+      accumulatorList(i).set(0, accumulator)
+      i += 1
+    }
+
+    val iterator = records.iterator()
+    while (iterator.hasNext) {
+      val record = iterator.next()
+
+      // accumulate
+      i = 0
+      while (i < aggregates.length) {
+        // insert received accumulator into acc list
+        val newAcc = record.getField(accumulatorStartPos + i).asInstanceOf[Accumulator]
+        accumulatorList(i).set(1, newAcc)
+        // merge acc list
+        val retAcc = aggregates(i).merge(accumulatorList(i))
+        // insert result into acc list
+        accumulatorList(i).set(0, retAcc)
+        i += 1
+      }
+
+      // check if this record is the last record
+      if (!iterator.hasNext) {
+        // set group keys value to final output
+        i = 0
+        while (i < groupKeysMapping.length) {
+          val mapping = groupKeysMapping(i)
+          output.setField(mapping._1, record.getField(mapping._2))
+          i += 1
+        }
+
+        // get final aggregate value and set to output.
+        i = 0
+        while (i < aggregateMapping.length) {
+          val mapping = aggregateMapping(i)
+          val agg = aggregates(i)
+          val result = agg.getValue(accumulatorList(mapping._2).get(0))
+          output.setField(mapping._1, result)
+          i += 1
+        }
+
+        // adds TimeWindow properties to output then emit output
+        if (finalRowWindowStartPos.isDefined || finalRowWindowEndPos.isDefined) {
+          collector.wrappedCollector = out
+          collector.windowStart = record.getField(windowStartPos).asInstanceOf[Long]
+          collector.windowEnd = collector.windowStart + windowSize
+
+          collector.collect(output)
+        } else {
+          out.collect(output)
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
index 85df1d8..ecc945c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
@@ -47,10 +47,8 @@ class DataSetTumbleCountWindowAggReduceGroupFunction(
     private val finalRowArity: Int)
   extends RichGroupReduceFunction[Row, Row] {
 
-  private var aggregateBuffer: Row = _
   private var output: Row = _
   private val accumStartPos: Int = groupKeysMapping.length
-  private val intermediateRowArity: Int = accumStartPos + aggregates.length + 1
 
   val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) {
     new JArrayList[Accumulator](2)
@@ -59,7 +57,6 @@ class DataSetTumbleCountWindowAggReduceGroupFunction(
   override def open(config: Configuration) {
     Preconditions.checkNotNull(aggregates)
     Preconditions.checkNotNull(groupKeysMapping)
-    aggregateBuffer = new Row(intermediateRowArity)
     output = new Row(finalRowArity)
 
     // init lists with two empty accumulators

http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
index 7ce0bf1..674c078 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
@@ -113,11 +113,10 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
 
     // get final aggregate value and set to output.
     aggregateMapping.foreach {
-      case (after, previous) => {
+      case (after, previous) =>
         val agg = aggregates(previous)
         val result = agg.getValue(accumulatorList(previous).get(0))
         output.setField(after, result)
-      }
     }
 
     // get window start timestamp

http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala
new file mode 100644
index 0000000..4a64c47
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.sql.Timestamp
+
+import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.Preconditions
+
+
+/**
+  * This map function only works for windows on batch tables. The differences between this function
+  * and [[org.apache.flink.table.runtime.aggregate.AggregateMapFunction]] is this function
+  * append an (aligned) rowtime field to the end of the output row.
+  */
+class DataSetWindowAggMapFunction(
+    private val aggregates: Array[AggregateFunction[_]],
+    private val aggFields: Array[Int],
+    private val groupingKeys: Array[Int],
+    private val timeFieldPos: Int, // time field position in input row
+    private val tumbleTimeWindowSize: Option[Long],
+    @transient private val returnType: TypeInformation[Row])
+  extends RichMapFunction[Row, Row] with ResultTypeQueryable[Row] {
+
+  private var output: Row = _
+  // rowtime index in the buffer output row
+  private var rowtimeIndex: Int = _
+
+  override def open(config: Configuration) {
+    Preconditions.checkNotNull(aggregates)
+    Preconditions.checkNotNull(aggFields)
+    Preconditions.checkArgument(aggregates.length == aggFields.length)
+    // add one more arity to store rowtime
+    val partialRowLength = groupingKeys.length + aggregates.length + 1
+    // set rowtime to the last field of the output row
+    rowtimeIndex = partialRowLength - 1
+    output = new Row(partialRowLength)
+  }
+
+  override def map(input: Row): Row = {
+
+    for (i <- aggregates.indices) {
+      val agg = aggregates(i)
+      val fieldValue = input.getField(aggFields(i))
+      val accumulator = agg.createAccumulator()
+      agg.accumulate(accumulator, fieldValue)
+      output.setField(groupingKeys.length + i, accumulator)
+    }
+
+    for (i <- groupingKeys.indices) {
+      output.setField(i, input.getField(groupingKeys(i)))
+    }
+
+    val timeField = input.getField(timeFieldPos)
+    val rowtime = getTimestamp(timeField)
+    if (tumbleTimeWindowSize.isDefined) {
+      // in case of tumble time window, align rowtime to window start to represent the window
+      output.setField(
+        rowtimeIndex,
+        TimeWindow.getWindowStartWithOffset(rowtime, 0L, tumbleTimeWindowSize.get))
+    } else {
+      // for session window and slide window
+      output.setField(rowtimeIndex, rowtime)
+    }
+
+    output
+  }
+
+  private def getTimestamp(timeField: Any): Long = {
+    timeField match {
+      case b: Byte => b.toLong
+      case t: Character => t.toLong
+      case s: Short => s.toLong
+      case i: Int => i.toLong
+      case l: Long => l
+      case f: Float => f.toLong
+      case d: Double => d.toLong
+      case s: String => s.toLong
+      case t: Timestamp => SqlFunctions.toLong(t)
+      case _ =>
+        throw new RuntimeException(
+          s"Window time field doesn't support ${timeField.getClass} type currently")
+    }
+  }
+
+  override def getProducedType: TypeInformation[Row] = {
+    returnType
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggregateMapFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggregateMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggregateMapFunction.scala
deleted file mode 100644
index 68088fc..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggregateMapFunction.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.aggregate
-
-import java.sql.Timestamp
-
-import org.apache.flink.api.common.functions.RichMapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.table.functions.AggregateFunction
-import org.apache.flink.types.Row
-import org.apache.flink.util.Preconditions
-
-
-/**
-  * This map function only works for windows on batch tables. The differences between this function
-  * and [[org.apache.flink.table.runtime.aggregate.AggregateMapFunction]] is this function
-  * append an (aligned) rowtime field to the end of the output row.
-  */
-class DataSetWindowAggregateMapFunction(
-    private val aggregates: Array[AggregateFunction[_]],
-    private val aggFields: Array[Int],
-    private val groupingKeys: Array[Int],
-    private val timeFieldPos: Int, // time field position in input row
-    private val tumbleTimeWindowSize: Option[Long],
-    @transient private val returnType: TypeInformation[Row])
-  extends RichMapFunction[Row, Row] with ResultTypeQueryable[Row] {
-
-  private var output: Row = _
-  // rowtime index in the buffer output row
-  private var rowtimeIndex: Int = _
-
-  override def open(config: Configuration) {
-    Preconditions.checkNotNull(aggregates)
-    Preconditions.checkNotNull(aggFields)
-    Preconditions.checkArgument(aggregates.length == aggFields.length)
-    // add one more arity to store rowtime
-    val partialRowLength = groupingKeys.length + aggregates.length + 1
-    // set rowtime to the last field of the output row
-    rowtimeIndex = partialRowLength - 1
-    output = new Row(partialRowLength)
-  }
-
-  override def map(input: Row): Row = {
-
-    for (i <- aggregates.indices) {
-      val agg = aggregates(i)
-      val fieldValue = input.getField(aggFields(i))
-      val accumulator = agg.createAccumulator()
-      agg.accumulate(accumulator, fieldValue)
-      output.setField(groupingKeys.length + i, accumulator)
-    }
-
-    for (i <- groupingKeys.indices) {
-      output.setField(i, input.getField(groupingKeys(i)))
-    }
-
-    val timeField = input.getField(timeFieldPos)
-    val rowtime = getTimestamp(timeField)
-    if (tumbleTimeWindowSize.isDefined) {
-      // in case of tumble time window, align rowtime to window start to represent the window
-      output.setField(
-        rowtimeIndex,
-        TimeWindow.getWindowStartWithOffset(rowtime, 0L, tumbleTimeWindowSize.get))
-    } else {
-      // for session window and slide window
-      output.setField(rowtimeIndex, rowtime)
-    }
-
-    output
-  }
-
-  private def getTimestamp(timeField: Any): Long = {
-    timeField match {
-      case b: Byte => b.toLong
-      case t: Character => t.toLong
-      case s: Short => s.toLong
-      case i: Int => i.toLong
-      case l: Long => l
-      case f: Float => f.toLong
-      case d: Double => d.toLong
-      case s: String => s.toLong
-      case t: Timestamp => t.getTime
-      case _ =>
-        throw new RuntimeException(
-          s"Window time field doesn't support ${timeField.getClass} type currently")
-    }
-  }
-
-  override def getProducedType: TypeInformation[Row] = {
-    returnType
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
index 00aba1f..13ac6a9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
@@ -53,7 +53,12 @@ class IncrementalAggregateAllWindowFunction[W <: Window](
 
     if (iterator.hasNext) {
       val record = iterator.next()
-      out.collect(record)
+      var i = 0
+      while (i < record.getArity) {
+        output.setField(i, record.getField(i))
+        i += 1
+      }
+      out.collect(output)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
index 818cd0e..3e7b66b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
@@ -19,16 +19,16 @@
 package org.apache.flink.table.api.scala.stream.table
 
 import org.apache.flink.api.scala._
-import org.apache.flink.types.Row
-import org.apache.flink.table.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark
-import org.apache.flink.table.api.scala.stream.utils.StreamITCase
-import org.apache.flink.table.api.scala._
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.table.AggregationsITCase.TimestampWithEqualWatermark
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase
+import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit.Test
 
@@ -146,42 +146,9 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase {
       "Hi,1,1,1,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
-
-  @Test
-  def testEventTimeSlidingWindow(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val stream = env
-      .fromCollection(data)
-      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
-    val table = stream.toTable(tEnv, 'long, 'int, 'string)
-
-    val windowedTable = table
-      .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count, 'w.start, 'w.end, 'w.start)
-
-    val results = windowedTable.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = Seq(
-      "Hello world,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0",
-      "Hello world,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015,1970-01-01 00:00:00.005",
-      "Hello world,1,1970-01-01 00:00:00.01,1970-01-01 00:00:00.02,1970-01-01 00:00:00.01",
-      "Hello world,1,1970-01-01 00:00:00.015,1970-01-01 00:00:00.025,1970-01-01 00:00:00.015",
-      "Hello,2,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005,1969-12-31 23:59:59.995",
-      "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0",
-      "Hi,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005,1969-12-31 23:59:59.995",
-      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
 }
 
-object GroupWindowITCase {
+object AggregationsITCase {
   class TimestampWithEqualWatermark extends AssignerWithPunctuatedWatermarks[(Long, Int, String)] {
 
     override def checkAndGetNextWatermark(