You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/03/08 16:03:23 UTC
[1/2] flink git commit: [FLINK-5047] [table] Add sliding
group-windows for batch tables
Repository: flink
Updated Branches:
refs/heads/master bec818d84 -> 31a57c5a8
http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
index d57f4f7..77ea66e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
@@ -21,17 +21,16 @@ package org.apache.flink.table.runtime.dataset
import java.math.BigDecimal
import org.apache.flink.api.scala._
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{TableEnvironment, ValidationException}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.types.Row
import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import scala.collection.JavaConverters._
@@ -197,4 +196,162 @@ class DataSetWindowAggregateITCase(
.toDataSet[Row]
}
+ // ----------------------------------------------------------------------------------------------
+ // Sliding windows
+ // ----------------------------------------------------------------------------------------------
+
+ @Test(expected = classOf[UnsupportedOperationException])
+ def testAllEventTimeSlidingGroupWindowOverCount(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val table = env
+ .fromCollection(data)
+ .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+
+ // Count sliding group window on event-time are currently not supported
+ table
+ .window(Slide over 2.rows every 2.rows on 'long as 'w)
+ .groupBy('w)
+ .select('int.count)
+ .toDataSet[Row]
+ }
+
+ @Test
+ def testAllEventTimeSlidingGroupWindowOverTime(): Unit = {
+ // please keep this test in sync with the DataStream variant
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val table = env
+ .fromCollection(data)
+ .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+
+ val windowedTable = table
+ .window(Slide over 5.milli every 2.milli on 'long as 'w)
+ .groupBy('w)
+ .select('int.count, 'w.start, 'w.end)
+
+ val expected =
+ "1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013\n" +
+ "1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017\n" +
+ "1,1970-01-01 00:00:00.014,1970-01-01 00:00:00.019\n" +
+ "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021\n" +
+ "2,1969-12-31 23:59:59.998,1970-01-01 00:00:00.003\n" +
+ "2,1970-01-01 00:00:00.006,1970-01-01 00:00:00.011\n" +
+ "3,1970-01-01 00:00:00.002,1970-01-01 00:00:00.007\n" +
+ "3,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009\n" +
+ "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005"
+
+ val results = windowedTable.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testEventTimeSlidingGroupWindowOverTimeOverlappingFullPane(): Unit = {
+ // please keep this test in sync with the DataStream variant
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val table = env
+ .fromCollection(data)
+ .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+
+ val windowedTable = table
+ .window(Slide over 10.milli every 5.milli on 'long as 'w)
+ .groupBy('string, 'w)
+ .select('string, 'int.count, 'w.start, 'w.end)
+
+ val expected =
+ "Hallo,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005\n" +
+ "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01\n" +
+ "Hello world,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01\n" +
+ "Hello world,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015\n" +
+ "Hello world,1,1970-01-01 00:00:00.01,1970-01-01 00:00:00.02\n" +
+ "Hello world,1,1970-01-01 00:00:00.015,1970-01-01 00:00:00.025\n" +
+ "Hello,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015\n" +
+ "Hello,2,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005\n" +
+ "Hello,3,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01\n" +
+ "Hi,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005\n" +
+ "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01"
+
+ val results = windowedTable.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testEventTimeSlidingGroupWindowOverTimeOverlappingSplitPane(): Unit = {
+ // please keep this test in sync with the DataStream variant
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val table = env
+ .fromCollection(data)
+ .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+
+ val windowedTable = table
+ .window(Slide over 5.milli every 4.milli on 'long as 'w)
+ .groupBy('string, 'w)
+ .select('string, 'int.count, 'w.start, 'w.end)
+
+ val expected =
+ "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" +
+ "Hello world,1,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009\n" +
+ "Hello world,1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013\n" +
+ "Hello world,1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017\n" +
+ "Hello world,1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021\n" +
+ "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" +
+ "Hello,2,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009\n" +
+ "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005"
+
+ val results = windowedTable.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testEventTimeSlidingGroupWindowOverTimeNonOverlappingFullPane(): Unit = {
+ // please keep this test in sync with the DataStream variant
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val table = env
+ .fromCollection(data)
+ .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+
+ val windowedTable = table
+ .window(Slide over 5.milli every 10.milli on 'long as 'w)
+ .groupBy('string, 'w)
+ .select('string, 'int.count, 'w.start, 'w.end)
+
+ val expected =
+ "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" +
+ "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" +
+ "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005"
+
+ val results = windowedTable.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testEventTimeSlidingGroupWindowOverTimeNonOverlappingSplitPane(): Unit = {
+ // please keep this test in sync with the DataStream variant
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val table = env
+ .fromCollection(data)
+ .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+
+ val windowedTable = table
+ .window(Slide over 3.milli every 10.milli on 'long as 'w)
+ .groupBy('string, 'w)
+ .select('string, 'int.count, 'w.start, 'w.end)
+
+ val expected =
+ "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003\n" +
+ "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003"
+
+ val results = windowedTable.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
new file mode 100644
index 0000000..85a2373
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.datastream
+
+import java.math.BigDecimal
+
+import org.apache.flink.api.scala._
+import org.apache.flink.types.Row
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase
+import org.apache.flink.table.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.runtime.datastream.DataStreamAggregateITCase.TimestampWithEqualWatermark
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase {
+
+ val data = List(
+ (1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"),
+ (2L, 2, 2d, 2f, new BigDecimal("2"), "Hallo"),
+ (3L, 2, 2d, 2f, new BigDecimal("2"), "Hello"),
+ (4L, 5, 5d, 5f, new BigDecimal("5"), "Hello"),
+ (7L, 3, 3d, 3f, new BigDecimal("3"), "Hello"),
+ (8L, 3, 3d, 3f, new BigDecimal("3"), "Hello world"),
+ (16L, 4, 4d, 4f, new BigDecimal("4"), "Hello world"))
+
+ // ----------------------------------------------------------------------------------------------
+ // Sliding windows
+ // ----------------------------------------------------------------------------------------------
+
+ @Test
+ def testAllEventTimeSlidingGroupWindowOverTime(): Unit = {
+ // please keep this test in sync with the DataSet variant
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env
+ .fromCollection(data)
+ .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+ val table = stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+
+ val windowedTable = table
+ .window(Slide over 5.milli every 2.milli on 'rowtime as 'w)
+ .groupBy('w)
+ .select('int.count, 'w.start, 'w.end)
+
+ val results = windowedTable.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq(
+ "1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013",
+ "1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017",
+ "1,1970-01-01 00:00:00.014,1970-01-01 00:00:00.019",
+ "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021",
+ "2,1969-12-31 23:59:59.998,1970-01-01 00:00:00.003",
+ "2,1970-01-01 00:00:00.006,1970-01-01 00:00:00.011",
+ "3,1970-01-01 00:00:00.002,1970-01-01 00:00:00.007",
+ "3,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009",
+ "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testEventTimeSlidingGroupWindowOverTimeOverlappingFullPane(): Unit = {
+ // please keep this test in sync with the DataSet variant
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env
+ .fromCollection(data)
+ .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+ val table = stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+
+ val windowedTable = table
+ .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
+ .groupBy('w, 'string)
+ .select('string, 'int.count, 'w.start, 'w.end)
+
+ val results = windowedTable.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq(
+ "Hallo,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005",
+ "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01",
+ "Hello world,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01",
+ "Hello world,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015",
+ "Hello world,1,1970-01-01 00:00:00.01,1970-01-01 00:00:00.02",
+ "Hello world,1,1970-01-01 00:00:00.015,1970-01-01 00:00:00.025",
+ "Hello,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015",
+ "Hello,2,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005",
+ "Hello,3,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01",
+ "Hi,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005",
+ "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testEventTimeSlidingGroupWindowOverTimeOverlappingSplitPane(): Unit = {
+ // please keep this test in sync with the DataSet variant
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env
+ .fromCollection(data)
+ .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+ val table = stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+
+ val windowedTable = table
+ .window(Slide over 5.milli every 4.milli on 'rowtime as 'w)
+ .groupBy('w, 'string)
+ .select('string, 'int.count, 'w.start, 'w.end)
+
+ val results = windowedTable.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq(
+ "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+ "Hello world,1,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009",
+ "Hello world,1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013",
+ "Hello world,1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017",
+ "Hello world,1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021",
+ "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+ "Hello,2,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009",
+ "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testEventTimeSlidingGroupWindowOverTimeNonOverlappingFullPane(): Unit = {
+ // please keep this test in sync with the DataSet variant
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env
+ .fromCollection(data)
+ .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+ val table = stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+
+ val windowedTable = table
+ .window(Slide over 5.milli every 10.milli on 'rowtime as 'w)
+ .groupBy('w, 'string)
+ .select('string, 'int.count, 'w.start, 'w.end)
+
+ val results = windowedTable.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq(
+ "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+ "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+ "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testEventTimeSlidingGroupWindowOverTimeNonOverlappingSplitPane(): Unit = {
+ // please keep this test in sync with the DataSet variant
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env
+ .fromCollection(data)
+ .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+ val table = stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+
+ val windowedTable = table
+ .window(Slide over 3.milli every 10.milli on 'rowtime as 'w)
+ .groupBy('w, 'string)
+ .select('string, 'int.count, 'w.start, 'w.end)
+
+ val results = windowedTable.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq(
+ "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003",
+ "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+}
+
+object DataStreamAggregateITCase {
+ class TimestampWithEqualWatermark
+ extends AssignerWithPunctuatedWatermarks[(Long, Int, Double, Float, BigDecimal, String)] {
+
+ override def checkAndGetNextWatermark(
+ lastElement: (Long, Int, Double, Float, BigDecimal, String),
+ extractedTimestamp: Long)
+ : Watermark = {
+ new Watermark(extractedTimestamp)
+ }
+
+ override def extractTimestamp(
+ element: (Long, Int, Double, Float, BigDecimal, String),
+ previousElementTimestamp: Long): Long = {
+ element._1
+ }
+ }
+}
[2/2] flink git commit: [FLINK-5047] [table] Add sliding
group-windows for batch tables
Posted by tw...@apache.org.
[FLINK-5047] [table] Add sliding group-windows for batch tables
This closes #3364.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/31a57c5a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/31a57c5a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/31a57c5a
Branch: refs/heads/master
Commit: 31a57c5a89d6d22ccb629c2adfe4ffb87441e6dd
Parents: bec818d
Author: twalthr <tw...@apache.org>
Authored: Wed Jan 18 16:56:02 2017 +0100
Committer: twalthr <tw...@apache.org>
Committed: Wed Mar 8 17:01:27 2017 +0100
----------------------------------------------------------------------
.../table/functions/AggregateFunction.scala | 8 +-
.../nodes/dataset/DataSetWindowAggregate.scala | 117 ++++++++-
.../table/runtime/aggregate/AggregateUtil.scala | 236 ++++++++++++++++---
...SetSessionWindowAggReduceGroupFunction.scala | 201 ++++++++++++++++
...sionWindowAggregateReduceGroupFunction.scala | 201 ----------------
...taSetSlideTimeWindowAggFlatMapFunction.scala | 63 +++++
...tSlideTimeWindowAggReduceGroupFunction.scala | 202 ++++++++++++++++
...SetSlideWindowAggReduceCombineFunction.scala | 117 +++++++++
...taSetSlideWindowAggReduceGroupFunction.scala | 141 +++++++++++
...umbleCountWindowAggReduceGroupFunction.scala | 3 -
...TumbleTimeWindowAggReduceGroupFunction.scala | 3 +-
.../aggregate/DataSetWindowAggMapFunction.scala | 112 +++++++++
.../DataSetWindowAggregateMapFunction.scala | 111 ---------
.../IncrementalAggregateAllWindowFunction.scala | 7 +-
.../scala/stream/table/AggregationsITCase.scala | 43 +---
.../dataset/DataSetWindowAggregateITCase.scala | 163 ++++++++++++-
.../datastream/DataStreamAggregateITCase.scala | 235 ++++++++++++++++++
17 files changed, 1566 insertions(+), 397 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
index 967d2ea..773c71f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
@@ -61,7 +61,7 @@ abstract class AggregateFunction[T] extends UserDefinedFunction {
def getValue(accumulator: Accumulator): T
/**
- * Process the input values and update the provided accumulator instance.
+ * Processes the input values and update the provided accumulator instance.
*
* @param accumulator the accumulator which contains the current
* aggregated results
@@ -70,9 +70,9 @@ abstract class AggregateFunction[T] extends UserDefinedFunction {
def accumulate(accumulator: Accumulator, input: Any): Unit
/**
- * Merge a list of accumulator instances into one accumulator instance.
+ * Merges a list of accumulator instances into one accumulator instance.
*
- * IMPORTANT: You may only return a new accumulator instance or the the first accumulator of the
+ * IMPORTANT: You may only return a new accumulator instance or the first accumulator of the
* input list. If you return another instance, the result of the aggregation function might be
* incorrect.
*
@@ -88,7 +88,7 @@ abstract class AggregateFunction[T] extends UserDefinedFunction {
*
* @return The type information for the accumulator.
*/
- def getAccumulatorType(): TypeInformation[_] = null
+ def getAccumulatorType: TypeInformation[_] = null
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
index fb5ff3b..a94deb1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
@@ -111,17 +111,25 @@ class DataSetWindowAggregate(
// whether identifiers are matched case-sensitively
val caseSensitive = tableEnv.getFrameworkConfig.getParserConfig.caseSensitive()
+
window match {
case EventTimeTumblingGroupWindow(_, _, size) =>
createEventTimeTumblingWindowDataSet(
inputDS,
isTimeInterval(size.resultType),
caseSensitive)
+
case EventTimeSessionGroupWindow(_, _, gap) =>
createEventTimeSessionWindowDataSet(inputDS, caseSensitive)
- case EventTimeSlidingGroupWindow(_, _, _, _) =>
- throw new UnsupportedOperationException(
- "Event-time sliding windows in a batch environment are currently not supported")
+
+ case EventTimeSlidingGroupWindow(_, _, size, slide) =>
+ createEventTimeSlidingWindowDataSet(
+ inputDS,
+ isTimeInterval(size.resultType),
+ asLong(size),
+ asLong(slide),
+ caseSensitive)
+
case _: ProcessingTimeGroupWindow =>
throw new UnsupportedOperationException(
"Processing-time tumbling windows are not supported in a batch environment, " +
@@ -130,7 +138,6 @@ class DataSetWindowAggregate(
}
}
-
private def createEventTimeTumblingWindowDataSet(
inputDS: DataSet[Row],
isTimeWindow: Boolean,
@@ -312,6 +319,108 @@ class DataSetWindowAggregate(
}
}
+ private def createEventTimeSlidingWindowDataSet(
+ inputDS: DataSet[Row],
+ isTimeWindow: Boolean,
+ size: Long,
+ slide: Long,
+ isParserCaseSensitive: Boolean)
+ : DataSet[Row] = {
+
+ // create MapFunction for initializing the aggregations
+ // it aligns the rowtime for pre-tumbling in case of a time-window for partial aggregates
+ val mapFunction = createDataSetWindowPrepareMapFunction(
+ window,
+ namedAggregates,
+ grouping,
+ inputType,
+ isParserCaseSensitive)
+
+ val mappedDataSet = inputDS
+ .map(mapFunction)
+ .name(prepareOperatorName)
+
+ val mapReturnType = mappedDataSet.getType
+
+ val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+ val groupingKeys = grouping.indices.toArray
+
+ // do partial aggregation if possible
+ val isPartial = doAllSupportPartialMerge(
+ namedAggregates.map(_.getKey),
+ inputType,
+ grouping.length)
+
+ // only pre-tumble if it is worth it
+ val isLittleTumblingSize = determineLargestTumblingSize(size, slide) <= 1
+
+ val preparedDataSet = if (isTimeWindow) {
+ // time window
+
+ if (isPartial && !isLittleTumblingSize) {
+ // partial aggregates
+
+ val groupingKeysAndAlignedRowtime = groupingKeys :+ mapReturnType.getArity - 1
+
+ // create GroupReduceFunction
+ // for pre-tumbling and replicating/omitting the content for each pane
+ val prepareReduceFunction = createDataSetSlideWindowPrepareGroupReduceFunction(
+ window,
+ namedAggregates,
+ grouping,
+ inputType,
+ isParserCaseSensitive)
+
+ mappedDataSet.asInstanceOf[DataSet[Row]]
+ .groupBy(groupingKeysAndAlignedRowtime: _*)
+ .reduceGroup(prepareReduceFunction) // pre-tumbles and replicates/omits
+ .name(prepareOperatorName)
+ } else {
+ // non-partial aggregates
+
+ // create FlatMapFunction
+ // for replicating/omitting the content for each pane
+ val prepareFlatMapFunction = createDataSetSlideWindowPrepareFlatMapFunction(
+ window,
+ namedAggregates,
+ grouping,
+ mapReturnType,
+ isParserCaseSensitive)
+
+ mappedDataSet
+ .flatMap(prepareFlatMapFunction) // replicates/omits
+ }
+ } else {
+ // count window
+
+ throw new UnsupportedOperationException(
+ "Count sliding group windows on event-time are currently not supported.")
+ }
+
+ val prepareReduceReturnType = preparedDataSet.getType
+
+ // create GroupReduceFunction for final aggregation and conversion to output row
+ val aggregateReduceFunction = createDataSetWindowAggregationGroupReduceFunction(
+ window,
+ namedAggregates,
+ inputType,
+ rowRelDataType,
+ grouping,
+ namedProperties,
+ isInputCombined = false)
+
+ // gets the window-start position in the intermediate result.
+ val windowStartPos = prepareReduceReturnType.getArity - 1
+
+ val groupingKeysAndWindowStart = groupingKeys :+ windowStartPos
+
+ preparedDataSet
+ .groupBy(groupingKeysAndWindowStart: _*)
+ .reduceGroup(aggregateReduceFunction)
+ .returns(rowTypeInfo)
+ .name(aggregateOperatorName)
+ }
+
private def prepareOperatorName: String = {
val aggString = aggregationToString(
inputType,
http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index acb6cd0..4900b1b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -21,26 +21,26 @@ import java.util
import org.apache.calcite.rel.`type`._
import org.apache.calcite.rel.core.AggregateCall
-import org.apache.calcite.sql.{SqlAggFunction, SqlKind}
-import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.sql.fun._
-import org.apache.flink.api.common.functions.{GroupCombineFunction, InvalidTypesException, MapFunction, MapPartitionFunction, RichGroupReduceFunction, AggregateFunction => ApiAggregateFunction}
+import org.apache.calcite.sql.{SqlAggFunction, SqlKind}
+import org.apache.flink.api.common.functions.{FlatMapFunction, GroupCombineFunction, InvalidTypesException, MapFunction, MapPartitionFunction, RichGroupReduceFunction, AggregateFunction => ApiAggregateFunction}
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
-import FlinkRelBuilder.NamedWindowProperty
-import org.apache.flink.table.expressions._
-import org.apache.flink.table.plan.logical._
-import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction}
import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
-import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.table.api.{TableException, Types}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions._
import org.apache.flink.table.functions.aggfunctions._
-import org.apache.flink.table.functions.{AggregateFunction => TableAggregateFunction}
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
+import org.apache.flink.table.functions.{AggregateFunction => TableAggregateFunction}
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.typeutils.TypeCheckUtils._
import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
import org.apache.flink.types.Row
@@ -160,23 +160,37 @@ object AggregateUtil {
groupings,
aggregates,
inputType,
- Some(Array(Types.LONG)))
+ Some(Array(BasicTypeInfo.LONG_TYPE_INFO)))
val (timeFieldPos, tumbleTimeWindowSize) = window match {
+ case EventTimeTumblingGroupWindow(_, time, size) if isTimeInterval(size.resultType) =>
+ val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive)
+ (timeFieldPos, Some(asLong(size)))
+
case EventTimeTumblingGroupWindow(_, time, size) =>
val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive)
- size match {
- case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
- (timeFieldPos, Some(value))
- case _ => (timeFieldPos, None)
- }
+ (timeFieldPos, None)
+
case EventTimeSessionGroupWindow(_, time, _) =>
- (getTimeFieldPosition(time, inputType, isParserCaseSensitive), None)
+ val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive)
+ (timeFieldPos, None)
+
+ case EventTimeSlidingGroupWindow(_, time, size, slide)
+ if isTimeInterval(time.resultType) && doAllSupportPartialMerge(aggregates) =>
+ // pre-tumble incremental aggregates on time-windows
+ val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive)
+ val preTumblingSize = determineLargestTumblingSize(asLong(size), asLong(slide))
+ (timeFieldPos, Some(preTumblingSize))
+
+ case EventTimeSlidingGroupWindow(_, time, _, _) =>
+ val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive)
+ (timeFieldPos, None)
+
case _ =>
throw new UnsupportedOperationException(s"$window is currently not supported on batch")
}
- new DataSetWindowAggregateMapFunction(
+ new DataSetWindowAggMapFunction(
aggregates,
aggFieldIndexes,
groupings,
@@ -186,6 +200,116 @@ object AggregateUtil {
}
/**
+ * Create a [[org.apache.flink.api.common.functions.GroupReduceFunction]] that prepares for
+ * partial aggregates of sliding windows (time and count-windows).
+ * It requires a prepared input (with intermediate aggregate fields and aligned rowtime for
+ * pre-tumbling in case of time-windows), pre-aggregates (pre-tumbles) rows, aligns the
+ * window-start, and replicates or omits records for different panes of a sliding window.
+ *
+ * The output of the function contains the grouping keys, the intermediate aggregate values of
+ * all aggregate function and the aligned window start. Window start must not be a timestamp,
+ * but can also be a count value for count-windows.
+ *
+ * The output is stored in Row by the following format:
+ *
+ * {{{
+ * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5
+ * | |
+ * v v
+ * +---------+---------+--------+--------+--------+--------+-------------+
+ * |groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | windowStart |
+ * +---------+---------+--------+--------+--------+--------+-------------+
+ * ^ ^
+ * | |
+ * sum(y) aggOffsetInRow = 4 window start for pane mapping
+ * }}}
+ *
+ * NOTE: this function is only used for sliding windows with partial aggregates on batch tables.
+ */
+ def createDataSetSlideWindowPrepareGroupReduceFunction(
+ window: LogicalWindow,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ groupings: Array[Int],
+ inputType: RelDataType,
+ isParserCaseSensitive: Boolean)
+ : RichGroupReduceFunction[Row, Row] = {
+
+ val aggregates = transformToAggregateFunctions(
+ namedAggregates.map(_.getKey),
+ inputType,
+ needRetraction = false)._2
+
+ val returnType: RowTypeInfo = createDataSetAggregateBufferDataType(
+ groupings,
+ aggregates,
+ inputType,
+ Some(Array(BasicTypeInfo.LONG_TYPE_INFO)))
+
+ window match {
+ case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
+ // sliding time-window for partial aggregations
+ new DataSetSlideTimeWindowAggReduceGroupFunction(
+ aggregates,
+ groupings.length,
+ returnType.getArity - 1,
+ asLong(size),
+ asLong(slide),
+ returnType)
+
+ case _ =>
+ throw new UnsupportedOperationException(s"$window is currently not supported on batch.")
+ }
+ }
+
+ /**
+ * Create a [[org.apache.flink.api.common.functions.FlatMapFunction]] that prepares for
+ * non-incremental aggregates of sliding windows (time-windows).
+ *
+ * It requires a prepared input (with intermediate aggregate fields), aligns the
+ * window-start, and replicates or omits records for different panes of a sliding window.
+ *
+ * The output of the function contains the grouping keys, the intermediate aggregate values of
+ * all aggregate function and the aligned window start.
+ *
+ * The output is stored in Row by the following format:
+ *
+ * {{{
+ * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5
+ * | |
+ * v v
+ * +---------+---------+--------+--------+--------+--------+-------------+
+ * |groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | windowStart |
+ * +---------+---------+--------+--------+--------+--------+-------------+
+ * ^ ^
+ * | |
+ * sum(y) aggOffsetInRow = 4 window start for pane mapping
+ * }}}
+ *
+ * NOTE: this function is only used for time-based sliding windows on batch tables.
+ */
+ def createDataSetSlideWindowPrepareFlatMapFunction(
+ window: LogicalWindow,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ groupings: Array[Int],
+ inputType: TypeInformation[Row],
+ isParserCaseSensitive: Boolean)
+ : FlatMapFunction[Row, Row] = {
+
+ window match {
+ case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
+ new DataSetSlideTimeWindowAggFlatMapFunction(
+ inputType.getArity - 1,
+ asLong(size),
+ asLong(slide),
+ inputType)
+
+ case _ =>
+ throw new UnsupportedOperationException(
+ s"$window is currently not supported in a batch environment.")
+ }
+ }
+
+ /**
* Create a [[org.apache.flink.api.common.functions.GroupReduceFunction]] to compute window
* aggregates on batch tables. If all aggregates support partial aggregation and is a time
* window, the [[org.apache.flink.api.common.functions.GroupReduceFunction]] implements
@@ -203,10 +327,10 @@ object AggregateUtil {
isInputCombined: Boolean = false)
: RichGroupReduceFunction[Row, Row] = {
- val aggregates = transformToAggregateFunctions(
+ val (aggFieldIndexes, aggregates) = transformToAggregateFunctions(
namedAggregates.map(_.getKey),
inputType,
- needRetraction = false)._2
+ needRetraction = false)
// the mapping relation between field index of intermediate aggregate Row and output Row.
val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings)
@@ -259,7 +383,7 @@ object AggregateUtil {
case EventTimeSessionGroupWindow(_, _, gap) =>
val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
- new DataSetSessionWindowAggregateReduceGroupFunction(
+ new DataSetSessionWindowAggReduceGroupFunction(
aggregates,
groupingOffsetMapping,
aggOffsetMapping,
@@ -268,6 +392,42 @@ object AggregateUtil {
endPos,
asLong(gap),
isInputCombined)
+
+ case EventTimeSlidingGroupWindow(_, _, size, _) if isTimeInterval(size.resultType) =>
+ val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+ if (doAllSupportPartialMerge(aggregates)) {
+ // for partial aggregations
+ new DataSetSlideWindowAggReduceCombineFunction(
+ aggregates,
+ groupingOffsetMapping,
+ aggOffsetMapping,
+ outputType.getFieldCount,
+ startPos,
+ endPos,
+ asLong(size))
+ }
+ else {
+ // for non-partial aggregations
+ new DataSetSlideWindowAggReduceGroupFunction(
+ aggregates,
+ groupingOffsetMapping,
+ aggOffsetMapping,
+ outputType.getFieldCount,
+ startPos,
+ endPos,
+ asLong(size))
+ }
+
+ case EventTimeSlidingGroupWindow(_, _, size, _) =>
+ new DataSetSlideWindowAggReduceGroupFunction(
+ aggregates,
+ groupingOffsetMapping,
+ aggOffsetMapping,
+ outputType.getFieldCount,
+ None,
+ None,
+ asLong(size))
+
case _ =>
throw new UnsupportedOperationException(s"$window is currently not supported on batch")
}
@@ -355,6 +515,7 @@ object AggregateUtil {
needRetraction = false)._2
window match {
+
case EventTimeSessionGroupWindow(_, _, gap) =>
val combineReturnType: RowTypeInfo =
createDataSetAggregateBufferDataType(
@@ -368,6 +529,7 @@ object AggregateUtil {
groupings,
asLong(gap),
combineReturnType)
+
case _ =>
throw new UnsupportedOperationException(
s" [ ${window.getClass.getCanonicalName.split("\\.").last} ] is currently not " +
@@ -662,7 +824,8 @@ object AggregateUtil {
}
val sqlTypeName = inputType.getFieldList.get(aggFieldIndexes(index)).getType.getSqlTypeName
aggregateCall.getAggregation match {
- case _: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction => {
+
+ case _: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction =>
if (needRetraction) {
aggregates(index) = sqlTypeName match {
case TINYINT =>
@@ -702,8 +865,8 @@ object AggregateUtil {
throw new TableException("Sum aggregate does no support type:" + sqlType)
}
}
- }
- case _: SqlAvgAggFunction => {
+
+ case _: SqlAvgAggFunction =>
aggregates(index) = sqlTypeName match {
case TINYINT =>
new ByteAvgAggFunction
@@ -722,8 +885,8 @@ object AggregateUtil {
case sqlType: SqlTypeName =>
throw new TableException("Avg aggregate does no support type:" + sqlType)
}
- }
- case sqlMinMaxFunction: SqlMinMaxAggFunction => {
+
+ case sqlMinMaxFunction: SqlMinMaxAggFunction =>
aggregates(index) = if (sqlMinMaxFunction.getKind == SqlKind.MIN) {
if (needRetraction) {
sqlTypeName match {
@@ -815,9 +978,10 @@ object AggregateUtil {
}
}
}
- }
+
case _: SqlCountAggFunction =>
aggregates(index) = new CountAggFunction
+
case unSupported: SqlAggFunction =>
throw new TableException("unsupported Function: " + unSupported.getName)
}
@@ -833,7 +997,7 @@ object AggregateUtil {
val aggTypes: Seq[TypeInformation[_]] =
aggregates.map {
agg =>
- val accType = agg.getAccumulatorType()
+ val accType = agg.getAccumulatorType
if (accType != null) {
accType
} else {
@@ -969,10 +1133,22 @@ object AggregateUtil {
}
}
- private def asLong(expr: Expression): Long = expr match {
+ private[flink] def asLong(expr: Expression): Long = expr match {
case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) => value
case Literal(value: Long, RowIntervalTypeInfo.INTERVAL_ROWS) => value
case _ => throw new IllegalArgumentException()
}
+
+ private[flink] def determineLargestTumblingSize(size: Long, slide: Long): Long = {
+ if (slide > size) {
+ gcd(slide, size)
+ } else {
+ gcd(size, slide)
+ }
+ }
+
+ private def gcd(a: Long, b: Long): Long = {
+ if (b == 0) a else gcd(b, a % b)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
new file mode 100644
index 0000000..1f19687
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+import java.util.{ArrayList => JArrayList}
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+ * It wraps the aggregate logic inside of
+ * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. It is used for Session time-window
+ * on batch.
+ *
+ * Note:
+ *
+ * This can handle two input types (depending if input is combined or not):
+ *
+ * 1. when partial aggregate is not supported, the input data structure of reduce is
+ * |groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime|
+ * 2. when partial aggregate is supported, the input data structure of reduce is
+ * |groupKey1|groupKey2|sum1|count1|sum2|count2|windowStart|windowEnd|
+ *
+ * @param aggregates The aggregate functions.
+ * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
+ * and output Row.
+ * @param aggregateMapping The index mapping between aggregate function list and
+ * aggregated value index in output Row.
+ * @param finalRowArity The output row field count.
+ * @param finalRowWindowStartPos The relative window-start field position.
+ * @param finalRowWindowEndPos The relative window-end field position.
+ * @param gap Session time window gap.
+ */
+class DataSetSessionWindowAggReduceGroupFunction(
+ aggregates: Array[AggregateFunction[_ <: Any]],
+ groupKeysMapping: Array[(Int, Int)],
+ aggregateMapping: Array[(Int, Int)],
+ finalRowArity: Int,
+ finalRowWindowStartPos: Option[Int],
+ finalRowWindowEndPos: Option[Int],
+ gap: Long,
+ isInputCombined: Boolean)
+ extends RichGroupReduceFunction[Row, Row] {
+
+ private var aggregateBuffer: Row = _
+ private var output: Row = _
+ private var collector: TimeWindowPropertyCollector = _
+ private val accumStartPos: Int = groupKeysMapping.length
+ private val intermediateRowArity: Int = accumStartPos + aggregates.length + 2
+ private val intermediateRowWindowStartPos = intermediateRowArity - 2
+ private val intermediateRowWindowEndPos = intermediateRowArity - 1
+
+ val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) {
+ new JArrayList[Accumulator](2)
+ }
+
+ override def open(config: Configuration) {
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(groupKeysMapping)
+ aggregateBuffer = new Row(intermediateRowArity)
+ output = new Row(finalRowArity)
+ collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos)
+
+ // init lists with two empty accumulators
+ for (i <- aggregates.indices) {
+ val accumulator = aggregates(i).createAccumulator()
+ accumulatorList(i).add(accumulator)
+ accumulatorList(i).add(accumulator)
+ }
+ }
+
+ /**
+ * For grouped intermediate aggregate Rows, divide window according to the window-start
+ * and window-end, merge data (within a unified window) into an aggregate buffer, calculate
+ * aggregated values output from aggregate buffer, and then set them into output
+ * Row based on the mapping relationship between intermediate aggregate data and output data.
+ *
+ * @param records Grouped intermediate aggregate Rows iterator.
+ * @param out The collector to hand results to.
+ *
+ */
+ override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
+
+ var windowStart: java.lang.Long = null
+ var windowEnd: java.lang.Long = null
+ var currentRowTime: java.lang.Long = null
+
+ // reset first accumulator in merge list
+ for (i <- aggregates.indices) {
+ val accumulator = aggregates(i).createAccumulator()
+ accumulatorList(i).set(0, accumulator)
+ }
+
+ val iterator = records.iterator()
+
+ while (iterator.hasNext) {
+ val record = iterator.next()
+ currentRowTime = record.getField(intermediateRowWindowStartPos).asInstanceOf[Long]
+ // initial traversal or opening a new window
+ if (null == windowEnd ||
+ (null != windowEnd && currentRowTime > windowEnd)) {
+
+ // calculate the current window and open a new window
+ if (null != windowEnd) {
+ // evaluate and emit the current window's result.
+ doEvaluateAndCollect(out, accumulatorList, windowStart, windowEnd)
+
+ // reset first accumulator in list
+ for (i <- aggregates.indices) {
+ val accumulator = aggregates(i).createAccumulator()
+ accumulatorList(i).set(0, accumulator)
+ }
+ } else {
+ // set group keys value to final output.
+ groupKeysMapping.foreach {
+ case (after, previous) =>
+ output.setField(after, record.getField(previous))
+ }
+ }
+
+ windowStart = record.getField(intermediateRowWindowStartPos).asInstanceOf[Long]
+ }
+
+ for (i <- aggregates.indices) {
+ // insert received accumulator into acc list
+ val newAcc = record.getField(accumStartPos + i).asInstanceOf[Accumulator]
+ accumulatorList(i).set(1, newAcc)
+ // merge acc list
+ val retAcc = aggregates(i).merge(accumulatorList(i))
+ // insert result into acc list
+ accumulatorList(i).set(0, retAcc)
+ }
+
+ windowEnd = if (isInputCombined) {
+ // partial aggregate is supported
+ record.getField(intermediateRowWindowEndPos).asInstanceOf[Long]
+ } else {
+ // partial aggregate is not supported, window-start equal rowtime + gap
+ currentRowTime + gap
+ }
+ }
+ // evaluate and emit the current window's result.
+ doEvaluateAndCollect(out, accumulatorList, windowStart, windowEnd)
+ }
+
+ /**
+ * Evaluate and emit the data of the current window.
+ *
+ * @param out the collection of the aggregate results
+ * @param accumulatorList an array (indexed by aggregate index) of the accumulator lists for
+ * each aggregate
+ * @param windowStart the window's start attribute value is the min (rowtime) of all rows
+ * in the window.
+ * @param windowEnd the window's end property value is max (rowtime) + gap for all rows
+ * in the window.
+ */
+ def doEvaluateAndCollect(
+ out: Collector[Row],
+ accumulatorList: Array[JArrayList[Accumulator]],
+ windowStart: Long,
+ windowEnd: Long): Unit = {
+
+ // merge the accumulators and then get value for the final output
+ aggregateMapping.foreach {
+ case (after, previous) =>
+ val agg = aggregates(previous)
+ output.setField(after, agg.getValue(accumulatorList(previous).get(0)))
+ }
+
+ // adds TimeWindow properties to output then emit output
+ if (finalRowWindowStartPos.isDefined || finalRowWindowEndPos.isDefined) {
+ collector.wrappedCollector = out
+ collector.windowStart = windowStart
+ collector.windowEnd = windowEnd
+
+ collector.collect(output)
+ } else {
+ out.collect(output)
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala
deleted file mode 100644
index ebef211..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.aggregate
-
-import java.lang.Iterable
-import java.util.{ArrayList => JArrayList}
-
-import org.apache.flink.api.common.functions.RichGroupReduceFunction
-import org.apache.flink.types.Row
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
-import org.apache.flink.util.{Collector, Preconditions}
-
-/**
- * It wraps the aggregate logic inside of
- * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. It is used for Session time-window
- * on batch.
- *
- * Note:
- *
- * This can handle two input types (depending if input is combined or not):
- *
- * 1. when partial aggregate is not supported, the input data structure of reduce is
- * |groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime|
- * 2. when partial aggregate is supported, the input data structure of reduce is
- * |groupKey1|groupKey2|sum1|count1|sum2|count2|windowStart|windowEnd|
- *
- * @param aggregates The aggregate functions.
- * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
- * and output Row.
- * @param aggregateMapping The index mapping between aggregate function list and
- * aggregated value index in output Row.
- * @param finalRowArity The output row field count.
- * @param finalRowWindowStartPos The relative window-start field position.
- * @param finalRowWindowEndPos The relative window-end field position.
- * @param gap Session time window gap.
- */
-class DataSetSessionWindowAggregateReduceGroupFunction(
- aggregates: Array[AggregateFunction[_ <: Any]],
- groupKeysMapping: Array[(Int, Int)],
- aggregateMapping: Array[(Int, Int)],
- finalRowArity: Int,
- finalRowWindowStartPos: Option[Int],
- finalRowWindowEndPos: Option[Int],
- gap: Long,
- isInputCombined: Boolean)
- extends RichGroupReduceFunction[Row, Row] {
-
- private var aggregateBuffer: Row = _
- private var output: Row = _
- private var collector: TimeWindowPropertyCollector = _
- private val accumStartPos: Int = groupKeysMapping.length
- private val intermediateRowArity: Int = accumStartPos + aggregates.length + 2
- private val intermediateRowWindowStartPos = intermediateRowArity - 2
- private val intermediateRowWindowEndPos = intermediateRowArity - 1
-
- val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) {
- new JArrayList[Accumulator](2)
- }
-
- override def open(config: Configuration) {
- Preconditions.checkNotNull(aggregates)
- Preconditions.checkNotNull(groupKeysMapping)
- aggregateBuffer = new Row(intermediateRowArity)
- output = new Row(finalRowArity)
- collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos)
-
- // init lists with two empty accumulators
- for (i <- aggregates.indices) {
- val accumulator = aggregates(i).createAccumulator()
- accumulatorList(i).add(accumulator)
- accumulatorList(i).add(accumulator)
- }
- }
-
- /**
- * For grouped intermediate aggregate Rows, divide window according to the window-start
- * and window-end, merge data (within a unified window) into an aggregate buffer, calculate
- * aggregated values output from aggregate buffer, and then set them into output
- * Row based on the mapping relationship between intermediate aggregate data and output data.
- *
- * @param records Grouped intermediate aggregate Rows iterator.
- * @param out The collector to hand results to.
- *
- */
- override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
-
- var windowStart: java.lang.Long = null
- var windowEnd: java.lang.Long = null
- var currentRowTime: java.lang.Long = null
-
- // reset first accumulator in merge list
- for (i <- aggregates.indices) {
- val accumulator = aggregates(i).createAccumulator()
- accumulatorList(i).set(0, accumulator)
- }
-
- val iterator = records.iterator()
-
- while (iterator.hasNext) {
- val record = iterator.next()
- currentRowTime = record.getField(intermediateRowWindowStartPos).asInstanceOf[Long]
- // initial traversal or opening a new window
- if (null == windowEnd ||
- (null != windowEnd && currentRowTime > windowEnd)) {
-
- // calculate the current window and open a new window
- if (null != windowEnd) {
- // evaluate and emit the current window's result.
- doEvaluateAndCollect(out, accumulatorList, windowStart, windowEnd)
-
- // reset first accumulator in list
- for (i <- aggregates.indices) {
- val accumulator = aggregates(i).createAccumulator()
- accumulatorList(i).set(0, accumulator)
- }
- } else {
- // set group keys value to final output.
- groupKeysMapping.foreach {
- case (after, previous) =>
- output.setField(after, record.getField(previous))
- }
- }
-
- windowStart = record.getField(intermediateRowWindowStartPos).asInstanceOf[Long]
- }
-
- for (i <- aggregates.indices) {
- // insert received accumulator into acc list
- val newAcc = record.getField(accumStartPos + i).asInstanceOf[Accumulator]
- accumulatorList(i).set(1, newAcc)
- // merge acc list
- val retAcc = aggregates(i).merge(accumulatorList(i))
- // insert result into acc list
- accumulatorList(i).set(0, retAcc)
- }
-
- windowEnd = if (isInputCombined) {
- // partial aggregate is supported
- record.getField(intermediateRowWindowEndPos).asInstanceOf[Long]
- } else {
- // partial aggregate is not supported, window-start equal rowtime + gap
- currentRowTime + gap
- }
- }
- // evaluate and emit the current window's result.
- doEvaluateAndCollect(out, accumulatorList, windowStart, windowEnd)
- }
-
- /**
- * Evaluate and emit the data of the current window.
- *
- * @param out the collection of the aggregate results
- * @param accumulatorList an array (indexed by aggregate index) of the accumulator lists for
- * each aggregate
- * @param windowStart the window's start attribute value is the min (rowtime) of all rows
- * in the window.
- * @param windowEnd the window's end property value is max (rowtime) + gap for all rows
- * in the window.
- */
- def doEvaluateAndCollect(
- out: Collector[Row],
- accumulatorList: Array[JArrayList[Accumulator]],
- windowStart: Long,
- windowEnd: Long): Unit = {
-
- // merge the accumulators and then get value for the final output
- aggregateMapping.foreach {
- case (after, previous) =>
- val agg = aggregates(previous)
- output.setField(after, agg.getValue(accumulatorList(previous).get(0)))
- }
-
- // adds TimeWindow properties to output then emit output
- if (finalRowWindowStartPos.isDefined || finalRowWindowEndPos.isDefined) {
- collector.wrappedCollector = out
- collector.windowStart = windowStart
- collector.windowEnd = windowEnd
-
- collector.collect(output)
- } else {
- out.collect(output)
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala
new file mode 100644
index 0000000..5f37b8a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+
+/**
+ * It is used for sliding windows on batch for time-windows. It takes a prepared input row,
+ * aligns the window start, and replicates or omits records for different panes of a sliding
+ * window. It is used for non-partial aggregations.
+ *
+ * @param windowSize window size of the sliding window
+ * @param windowSlide window slide of the sliding window
+ * @param returnType return type of this function
+ */
+class DataSetSlideTimeWindowAggFlatMapFunction(
+ private val timeFieldPos: Int,
+ private val windowSize: Long,
+ private val windowSlide: Long,
+ @transient private val returnType: TypeInformation[Row])
+ extends RichFlatMapFunction[Row, Row]
+ with ResultTypeQueryable[Row] {
+
+ override def flatMap(record: Row, out: Collector[Row]): Unit = {
+ val windowStart = record.getField(timeFieldPos).asInstanceOf[Long]
+
+ // adopted from SlidingEventTimeWindows.assignWindows
+ var start: Long = TimeWindow.getWindowStartWithOffset(windowStart, 0, windowSlide)
+
+ // adopted from SlidingEventTimeWindows.assignWindows
+ while (start > windowStart - windowSize) {
+ record.setField(timeFieldPos, start)
+ out.collect(record)
+ start -= windowSlide
+ }
+ }
+
+ override def getProducedType: TypeInformation[Row] = {
+ returnType
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
new file mode 100644
index 0000000..5db3acb
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+import java.util.{ArrayList => JArrayList}
+
+import org.apache.flink.api.common.functions.{CombineFunction, RichGroupReduceFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+ * It is used for sliding windows on batch for time-windows. It takes a prepared input row (with
+ * aligned rowtime for pre-tumbling), pre-aggregates (pre-tumbles) rows, aligns the window start,
+ * and replicates or omits records for different panes of a sliding window.
+ *
+ * This function is similar to [[DataSetTumbleCountWindowAggReduceGroupFunction]], however,
+ * it does no final aggregate evaluation. It also includes the logic of
+ * [[DataSetSlideTimeWindowAggFlatMapFunction]].
+ *
+ * @param aggregates aggregate functions
+ * @param groupingKeysLength number of grouping keys
+ * @param timeFieldPos position of aligned time field
+ * @param windowSize window size of the sliding window
+ * @param windowSlide window slide of the sliding window
+ * @param returnType return type of this function
+ */
+class DataSetSlideTimeWindowAggReduceGroupFunction(
+ private val aggregates: Array[AggregateFunction[_ <: Any]],
+ private val groupingKeysLength: Int,
+ private val timeFieldPos: Int,
+ private val windowSize: Long,
+ private val windowSlide: Long,
+ @transient private val returnType: TypeInformation[Row])
+ extends RichGroupReduceFunction[Row, Row]
+ with CombineFunction[Row, Row]
+ with ResultTypeQueryable[Row] {
+
+ Preconditions.checkNotNull(aggregates)
+
+ protected var intermediateRow: Row = _
+ // add one field to store window start
+ protected val intermediateRowArity: Int = groupingKeysLength + aggregates.length + 1
+ protected val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) {
+ new JArrayList[Accumulator](2)
+ }
+ private val intermediateWindowStartPos: Int = intermediateRowArity - 1
+
+ override def open(config: Configuration) {
+ intermediateRow = new Row(intermediateRowArity)
+
+ // init lists with two empty accumulators
+ var i = 0
+ while (i < aggregates.length) {
+ val accumulator = aggregates(i).createAccumulator()
+ accumulatorList(i).add(accumulator)
+ accumulatorList(i).add(accumulator)
+ i += 1
+ }
+ }
+
+ override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
+
+ // reset first accumulator
+ var i = 0
+ while (i < aggregates.length) {
+ val accumulator = aggregates(i).createAccumulator()
+ accumulatorList(i).set(0, accumulator)
+ i += 1
+ }
+
+ val iterator = records.iterator()
+
+ while (iterator.hasNext) {
+ val record = iterator.next()
+
+ // accumulate
+ i = 0
+ while (i < aggregates.length) {
+ // insert received accumulator into acc list
+ val newAcc = record.getField(groupingKeysLength + i).asInstanceOf[Accumulator]
+ accumulatorList(i).set(1, newAcc)
+ // merge acc list
+ val retAcc = aggregates(i).merge(accumulatorList(i))
+ // insert result into acc list
+ accumulatorList(i).set(0, retAcc)
+ i += 1
+ }
+
+ // trigger tumbling evaluation
+ if (!iterator.hasNext) {
+ val windowStart = record.getField(timeFieldPos).asInstanceOf[Long]
+
+ // adopted from SlidingEventTimeWindows.assignWindows
+ var start: Long = TimeWindow.getWindowStartWithOffset(windowStart, 0, windowSlide)
+
+ // skip preparing output if it is not necessary
+ if (start > windowStart - windowSize) {
+
+ // set group keys
+ i = 0
+ while (i < groupingKeysLength) {
+ intermediateRow.setField(i, record.getField(i))
+ i += 1
+ }
+
+ // set accumulators
+ i = 0
+ while (i < aggregates.length) {
+ intermediateRow.setField(groupingKeysLength + i, accumulatorList(i).get(0))
+ i += 1
+ }
+
+ // adopted from SlidingEventTimeWindows.assignWindows
+ while (start > windowStart - windowSize) {
+ intermediateRow.setField(intermediateWindowStartPos, start)
+ out.collect(intermediateRow)
+ start -= windowSlide
+ }
+ }
+ }
+ }
+ }
+
+ override def combine(records: Iterable[Row]): Row = {
+
+ // reset first accumulator
+ var i = 0
+ while (i < aggregates.length) {
+ val accumulator = aggregates(i).createAccumulator()
+ accumulatorList(i).set(0, accumulator)
+ i += 1
+ }
+
+ val iterator = records.iterator()
+ while (iterator.hasNext) {
+ val record = iterator.next()
+
+ i = 0
+ while (i < aggregates.length) {
+ // insert received accumulator into acc list
+ val newAcc = record.getField(groupingKeysLength + i).asInstanceOf[Accumulator]
+ accumulatorList(i).set(1, newAcc)
+ // merge acc list
+ val retAcc = aggregates(i).merge(accumulatorList(i))
+ // insert result into acc list
+ accumulatorList(i).set(0, retAcc)
+ i += 1
+ }
+
+ // check if this record is the last record
+ if (!iterator.hasNext) {
+
+ // set group keys
+ i = 0
+ while (i < groupingKeysLength) {
+ intermediateRow.setField(i, record.getField(i))
+ i += 1
+ }
+
+ // set accumulators
+ i = 0
+ while (i < aggregates.length) {
+ intermediateRow.setField(groupingKeysLength + i, accumulatorList(i).get(0))
+ i += 1
+ }
+
+ intermediateRow.setField(timeFieldPos, record.getField(timeFieldPos))
+
+ return intermediateRow
+ }
+ }
+
+ // this code path should never be reached as we return before the loop finishes
+ // we need this to prevent a compiler error
+ throw new IllegalArgumentException("Group is empty. This should never happen.")
+ }
+
+ override def getProducedType: TypeInformation[Row] = {
+ returnType
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
new file mode 100644
index 0000000..c11e86b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.CombineFunction
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.types.Row
+
+/**
+ * Wraps the aggregate logic inside of
+ * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and
+ * [[org.apache.flink.api.java.operators.GroupCombineOperator]].
+ *
+ * It is used for sliding on batch for both time and count-windows.
+ *
+ * @param aggregates aggregate functions.
+ * @param groupKeysMapping index mapping of group keys between intermediate aggregate Row
+ * and output Row.
+ * @param aggregateMapping index mapping between aggregate function list and aggregated value
+ * index in output Row.
+ * @param finalRowArity output row field count
+ * @param finalRowWindowStartPos relative window-start position to last field of output row
+ * @param finalRowWindowEndPos relative window-end position to last field of output row
+ * @param windowSize size of the window, used to determine window-end for output row
+ */
+class DataSetSlideWindowAggReduceCombineFunction(
+ aggregates: Array[AggregateFunction[_ <: Any]],
+ groupKeysMapping: Array[(Int, Int)],
+ aggregateMapping: Array[(Int, Int)],
+ finalRowArity: Int,
+ finalRowWindowStartPos: Option[Int],
+ finalRowWindowEndPos: Option[Int],
+ windowSize: Long)
+ extends DataSetSlideWindowAggReduceGroupFunction(
+ aggregates,
+ groupKeysMapping,
+ aggregateMapping,
+ finalRowArity,
+ finalRowWindowStartPos,
+ finalRowWindowEndPos,
+ windowSize)
+ with CombineFunction[Row, Row] {
+
+ private val intermediateRowArity: Int = groupKeysMapping.length + aggregateMapping.length + 1
+ private val intermediateRow: Row = new Row(intermediateRowArity)
+
+ override def combine(records: Iterable[Row]): Row = {
+
+ // reset first accumulator
+ var i = 0
+ while (i < aggregates.length) {
+ val accumulator = aggregates(i).createAccumulator()
+ accumulatorList(i).set(0, accumulator)
+ i += 1
+ }
+
+ val iterator = records.iterator()
+ while (iterator.hasNext) {
+ val record = iterator.next()
+
+ // accumulate
+ i = 0
+ while (i < aggregates.length) {
+ // insert received accumulator into acc list
+ val newAcc = record.getField(groupKeysMapping.length + i).asInstanceOf[Accumulator]
+ accumulatorList(i).set(1, newAcc)
+ // merge acc list
+ val retAcc = aggregates(i).merge(accumulatorList(i))
+ // insert result into acc list
+ accumulatorList(i).set(0, retAcc)
+ i += 1
+ }
+
+ // check if this record is the last record
+ if (!iterator.hasNext) {
+ // set group keys
+ i = 0
+ while (i < groupKeysMapping.length) {
+ intermediateRow.setField(i, record.getField(i))
+ i += 1
+ }
+
+ // set the partial accumulated result
+ i = 0
+ while (i < aggregates.length) {
+ intermediateRow.setField(groupKeysMapping.length + i, accumulatorList(i).get(0))
+ i += 1
+ }
+
+ intermediateRow.setField(windowStartPos, record.getField(windowStartPos))
+
+ return intermediateRow
+ }
+ }
+
+ // this code path should never be reached as we return before the loop finishes
+ // we need this to prevent a compiler error
+ throw new IllegalArgumentException("Group is empty. This should never happen.")
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
new file mode 100644
index 0000000..e67fac0
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+import java.util.{ArrayList => JArrayList}
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+ * It wraps the aggregate logic inside of
+ * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
+ *
+ * It is used for sliding on batch for both time and count-windows.
+ *
+ * @param aggregates aggregate functions.
+ * @param groupKeysMapping index mapping of group keys between intermediate aggregate Row
+ * and output Row.
+ * @param aggregateMapping index mapping between aggregate function list and aggregated value
+ * index in output Row.
+ * @param finalRowArity output row field count
+ * @param finalRowWindowStartPos relative window-start position to last field of output row
+ * @param finalRowWindowEndPos relative window-end position to last field of output row
+ * @param windowSize size of the window, used to determine window-end for output row
+ */
+class DataSetSlideWindowAggReduceGroupFunction(
+ aggregates: Array[AggregateFunction[_ <: Any]],
+ groupKeysMapping: Array[(Int, Int)],
+ aggregateMapping: Array[(Int, Int)],
+ finalRowArity: Int,
+ finalRowWindowStartPos: Option[Int],
+ finalRowWindowEndPos: Option[Int],
+ windowSize: Long)
+ extends RichGroupReduceFunction[Row, Row] {
+
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(groupKeysMapping)
+
+ private var collector: TimeWindowPropertyCollector = _
+ private var output: Row = _
+ private val accumulatorStartPos: Int = groupKeysMapping.length
+ protected val windowStartPos: Int = accumulatorStartPos + aggregates.length
+
+ val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) {
+ new JArrayList[Accumulator](2)
+ }
+
+ override def open(config: Configuration) {
+ output = new Row(finalRowArity)
+ collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos)
+
+ // init lists with two empty accumulators
+ var i = 0
+ while (i < aggregates.length) {
+ val accumulator = aggregates(i).createAccumulator()
+ accumulatorList(i).add(accumulator)
+ accumulatorList(i).add(accumulator)
+ i += 1
+ }
+ }
+
+ override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
+
+ // reset first accumulator
+ var i = 0
+ while (i < aggregates.length) {
+ val accumulator = aggregates(i).createAccumulator()
+ accumulatorList(i).set(0, accumulator)
+ i += 1
+ }
+
+ val iterator = records.iterator()
+ while (iterator.hasNext) {
+ val record = iterator.next()
+
+ // accumulate
+ i = 0
+ while (i < aggregates.length) {
+ // insert received accumulator into acc list
+ val newAcc = record.getField(accumulatorStartPos + i).asInstanceOf[Accumulator]
+ accumulatorList(i).set(1, newAcc)
+ // merge acc list
+ val retAcc = aggregates(i).merge(accumulatorList(i))
+ // insert result into acc list
+ accumulatorList(i).set(0, retAcc)
+ i += 1
+ }
+
+ // check if this record is the last record
+ if (!iterator.hasNext) {
+ // set group keys value to final output
+ i = 0
+ while (i < groupKeysMapping.length) {
+ val mapping = groupKeysMapping(i)
+ output.setField(mapping._1, record.getField(mapping._2))
+ i += 1
+ }
+
+ // get final aggregate value and set to output.
+ i = 0
+ while (i < aggregateMapping.length) {
+ val mapping = aggregateMapping(i)
+ val agg = aggregates(i)
+ val result = agg.getValue(accumulatorList(mapping._2).get(0))
+ output.setField(mapping._1, result)
+ i += 1
+ }
+
+ // adds TimeWindow properties to output then emit output
+ if (finalRowWindowStartPos.isDefined || finalRowWindowEndPos.isDefined) {
+ collector.wrappedCollector = out
+ collector.windowStart = record.getField(windowStartPos).asInstanceOf[Long]
+ collector.windowEnd = collector.windowStart + windowSize
+
+ collector.collect(output)
+ } else {
+ out.collect(output)
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
index 85df1d8..ecc945c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
@@ -47,10 +47,8 @@ class DataSetTumbleCountWindowAggReduceGroupFunction(
private val finalRowArity: Int)
extends RichGroupReduceFunction[Row, Row] {
- private var aggregateBuffer: Row = _
private var output: Row = _
private val accumStartPos: Int = groupKeysMapping.length
- private val intermediateRowArity: Int = accumStartPos + aggregates.length + 1
val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) {
new JArrayList[Accumulator](2)
@@ -59,7 +57,6 @@ class DataSetTumbleCountWindowAggReduceGroupFunction(
override def open(config: Configuration) {
Preconditions.checkNotNull(aggregates)
Preconditions.checkNotNull(groupKeysMapping)
- aggregateBuffer = new Row(intermediateRowArity)
output = new Row(finalRowArity)
// init lists with two empty accumulators
http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
index 7ce0bf1..674c078 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
@@ -113,11 +113,10 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
// get final aggregate value and set to output.
aggregateMapping.foreach {
- case (after, previous) => {
+ case (after, previous) =>
val agg = aggregates(previous)
val result = agg.getValue(accumulatorList(previous).get(0))
output.setField(after, result)
- }
}
// get window start timestamp
http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala
new file mode 100644
index 0000000..4a64c47
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.sql.Timestamp
+
+import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.Preconditions
+
+
+/**
+ * This map function only works for windows on batch tables. The differences between this function
+ * and [[org.apache.flink.table.runtime.aggregate.AggregateMapFunction]] is this function
+ * append an (aligned) rowtime field to the end of the output row.
+ */
+class DataSetWindowAggMapFunction(
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Int],
+ private val groupingKeys: Array[Int],
+ private val timeFieldPos: Int, // time field position in input row
+ private val tumbleTimeWindowSize: Option[Long],
+ @transient private val returnType: TypeInformation[Row])
+ extends RichMapFunction[Row, Row] with ResultTypeQueryable[Row] {
+
+ private var output: Row = _
+ // rowtime index in the buffer output row
+ private var rowtimeIndex: Int = _
+
+ override def open(config: Configuration) {
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+ // add one more arity to store rowtime
+ val partialRowLength = groupingKeys.length + aggregates.length + 1
+ // set rowtime to the last field of the output row
+ rowtimeIndex = partialRowLength - 1
+ output = new Row(partialRowLength)
+ }
+
+ override def map(input: Row): Row = {
+
+ for (i <- aggregates.indices) {
+ val agg = aggregates(i)
+ val fieldValue = input.getField(aggFields(i))
+ val accumulator = agg.createAccumulator()
+ agg.accumulate(accumulator, fieldValue)
+ output.setField(groupingKeys.length + i, accumulator)
+ }
+
+ for (i <- groupingKeys.indices) {
+ output.setField(i, input.getField(groupingKeys(i)))
+ }
+
+ val timeField = input.getField(timeFieldPos)
+ val rowtime = getTimestamp(timeField)
+ if (tumbleTimeWindowSize.isDefined) {
+ // in case of tumble time window, align rowtime to window start to represent the window
+ output.setField(
+ rowtimeIndex,
+ TimeWindow.getWindowStartWithOffset(rowtime, 0L, tumbleTimeWindowSize.get))
+ } else {
+ // for session window and slide window
+ output.setField(rowtimeIndex, rowtime)
+ }
+
+ output
+ }
+
+ private def getTimestamp(timeField: Any): Long = {
+ timeField match {
+ case b: Byte => b.toLong
+ case t: Character => t.toLong
+ case s: Short => s.toLong
+ case i: Int => i.toLong
+ case l: Long => l
+ case f: Float => f.toLong
+ case d: Double => d.toLong
+ case s: String => s.toLong
+ case t: Timestamp => SqlFunctions.toLong(t)
+ case _ =>
+ throw new RuntimeException(
+ s"Window time field doesn't support ${timeField.getClass} type currently")
+ }
+ }
+
+ override def getProducedType: TypeInformation[Row] = {
+ returnType
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggregateMapFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggregateMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggregateMapFunction.scala
deleted file mode 100644
index 68088fc..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggregateMapFunction.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.aggregate
-
-import java.sql.Timestamp
-
-import org.apache.flink.api.common.functions.RichMapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.table.functions.AggregateFunction
-import org.apache.flink.types.Row
-import org.apache.flink.util.Preconditions
-
-
-/**
- * This map function only works for windows on batch tables. The differences between this function
- * and [[org.apache.flink.table.runtime.aggregate.AggregateMapFunction]] is this function
- * append an (aligned) rowtime field to the end of the output row.
- */
-class DataSetWindowAggregateMapFunction(
- private val aggregates: Array[AggregateFunction[_]],
- private val aggFields: Array[Int],
- private val groupingKeys: Array[Int],
- private val timeFieldPos: Int, // time field position in input row
- private val tumbleTimeWindowSize: Option[Long],
- @transient private val returnType: TypeInformation[Row])
- extends RichMapFunction[Row, Row] with ResultTypeQueryable[Row] {
-
- private var output: Row = _
- // rowtime index in the buffer output row
- private var rowtimeIndex: Int = _
-
- override def open(config: Configuration) {
- Preconditions.checkNotNull(aggregates)
- Preconditions.checkNotNull(aggFields)
- Preconditions.checkArgument(aggregates.length == aggFields.length)
- // add one more arity to store rowtime
- val partialRowLength = groupingKeys.length + aggregates.length + 1
- // set rowtime to the last field of the output row
- rowtimeIndex = partialRowLength - 1
- output = new Row(partialRowLength)
- }
-
- override def map(input: Row): Row = {
-
- for (i <- aggregates.indices) {
- val agg = aggregates(i)
- val fieldValue = input.getField(aggFields(i))
- val accumulator = agg.createAccumulator()
- agg.accumulate(accumulator, fieldValue)
- output.setField(groupingKeys.length + i, accumulator)
- }
-
- for (i <- groupingKeys.indices) {
- output.setField(i, input.getField(groupingKeys(i)))
- }
-
- val timeField = input.getField(timeFieldPos)
- val rowtime = getTimestamp(timeField)
- if (tumbleTimeWindowSize.isDefined) {
- // in case of tumble time window, align rowtime to window start to represent the window
- output.setField(
- rowtimeIndex,
- TimeWindow.getWindowStartWithOffset(rowtime, 0L, tumbleTimeWindowSize.get))
- } else {
- // for session window and slide window
- output.setField(rowtimeIndex, rowtime)
- }
-
- output
- }
-
- private def getTimestamp(timeField: Any): Long = {
- timeField match {
- case b: Byte => b.toLong
- case t: Character => t.toLong
- case s: Short => s.toLong
- case i: Int => i.toLong
- case l: Long => l
- case f: Float => f.toLong
- case d: Double => d.toLong
- case s: String => s.toLong
- case t: Timestamp => t.getTime
- case _ =>
- throw new RuntimeException(
- s"Window time field doesn't support ${timeField.getClass} type currently")
- }
- }
-
- override def getProducedType: TypeInformation[Row] = {
- returnType
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
index 00aba1f..13ac6a9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
@@ -53,7 +53,12 @@ class IncrementalAggregateAllWindowFunction[W <: Window](
if (iterator.hasNext) {
val record = iterator.next()
- out.collect(record)
+ var i = 0
+ while (i < record.getArity) {
+ output.setField(i, record.getField(i))
+ i += 1
+ }
+ out.collect(output)
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
index 818cd0e..3e7b66b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
@@ -19,16 +19,16 @@
package org.apache.flink.table.api.scala.stream.table
import org.apache.flink.api.scala._
-import org.apache.flink.types.Row
-import org.apache.flink.table.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark
-import org.apache.flink.table.api.scala.stream.utils.StreamITCase
-import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.table.AggregationsITCase.TimestampWithEqualWatermark
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase
+import org.apache.flink.types.Row
import org.junit.Assert._
import org.junit.Test
@@ -146,42 +146,9 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase {
"Hi,1,1,1,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
-
- @Test
- def testEventTimeSlidingWindow(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val stream = env
- .fromCollection(data)
- .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
- val table = stream.toTable(tEnv, 'long, 'int, 'string)
-
- val windowedTable = table
- .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
- .groupBy('w, 'string)
- .select('string, 'int.count, 'w.start, 'w.end, 'w.start)
-
- val results = windowedTable.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = Seq(
- "Hello world,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0",
- "Hello world,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015,1970-01-01 00:00:00.005",
- "Hello world,1,1970-01-01 00:00:00.01,1970-01-01 00:00:00.02,1970-01-01 00:00:00.01",
- "Hello world,1,1970-01-01 00:00:00.015,1970-01-01 00:00:00.025,1970-01-01 00:00:00.015",
- "Hello,2,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005,1969-12-31 23:59:59.995",
- "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0",
- "Hi,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005,1969-12-31 23:59:59.995",
- "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
}
-object GroupWindowITCase {
+object AggregationsITCase {
class TimestampWithEqualWatermark extends AssignerWithPunctuatedWatermarks[(Long, Int, String)] {
override def checkAndGetNextWatermark(