You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/09 16:50:57 UTC
[4/4] flink git commit: [FLINK-6479] [table] Fix IOOBE in
DataStreamGroupWindowAggregate.
[FLINK-6479] [table] Fix IOOBE in DataStreamGroupWindowAggregate.
This closes #3841.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f26a9116
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f26a9116
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f26a9116
Branch: refs/heads/master
Commit: f26a911627204519a78ccc57b1f12b387d85e43b
Parents: e0ab5f5
Author: sunjincheng121 <su...@gmail.com>
Authored: Mon May 8 16:41:31 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue May 9 18:50:20 2017 +0200
----------------------------------------------------------------------
.../DataStreamGroupWindowAggregate.scala | 2 +-
.../table/GroupWindowAggregationsITCase.scala | 32 +++++++++++++++
.../scala/stream/table/GroupWindowTest.scala | 41 +++++++++++++++++++-
3 files changed, 73 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f26a9116/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index 1be1896..c38e5af 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -134,7 +134,7 @@ class DataStreamGroupWindowAggregate(
namedAggregates,
namedProperties)
- val keyedAggOpName = s"groupBy: (${groupingToString(schema.logicalType, grouping)}), " +
+ val keyedAggOpName = s"groupBy: (${groupingToString(inputSchema.logicalType, grouping)}), " +
s"window: ($window), " +
s"select: ($aggString)"
val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"
http://git-wip-us.apache.org/repos/asf/flink/blob/f26a9116/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
index 2c027a9..846fe3e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
@@ -175,6 +175,38 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
"Hi,1,1,1,1,1,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
+
+ @Test
+ def testGroupWindowWithoutKeyInProjection(): Unit = {
+ val data = List(
+ (1L, 1, "Hi", 1, 1),
+ (2L, 2, "Hello", 2, 2),
+ (4L, 2, "Hello", 2, 2),
+ (8L, 3, "Hello world", 3, 3),
+ (16L, 3, "Hello world", 3, 3))
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setParallelism(1)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env.fromCollection(data)
+ val table = stream.toTable(tEnv, 'long, 'int, 'string, 'int2, 'int3, 'proctime.proctime)
+
+ val weightAvgFun = new WeightedAvg
+
+ val windowedTable = table
+ .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
+ .groupBy('w, 'int2, 'int3, 'string)
+ .select(weightAvgFun('long, 'int))
+
+ val results = windowedTable.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq("12", "8", "2", "3", "1")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
}
object GroupWindowAggregationsITCase {
http://git-wip-us.apache.org/repos/asf/flink/blob/f26a9116/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
index 0573ff3..ef071b7 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
@@ -19,7 +19,7 @@
package org.apache.flink.table.api.scala.stream.table
import org.apache.flink.api.scala._
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithMerge
+import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMerge}
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.api.scala._
import org.apache.flink.table.expressions.WindowReference
@@ -792,6 +792,45 @@ class GroupWindowTest extends TableTestBase {
}
@Test
+ def testSlidingWindowWithUDAF(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String, Int, Int)](
+ 'long,
+ 'int,
+ 'string,
+ 'int2,
+ 'int3,
+ 'proctime.proctime)
+
+ val weightAvgFun = new WeightedAvg
+
+ val windowedTable = table
+ .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
+ .groupBy('w, 'int2, 'int3, 'string)
+ .select(weightAvgFun('long, 'int))
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupWindowAggregate",
+ streamTableNode(0),
+ term("groupBy", "string, int2, int3"),
+ term("window", SlidingGroupWindow(WindowReference("w"), 'proctime, 2.rows, 1.rows)),
+ term(
+ "select",
+ "string",
+ "int2",
+ "int3",
+ "WeightedAvg(long, int) AS TMP_0")
+ ),
+ term("select","TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
def testSlideWindowStartEnd(): Unit = {
val util = streamTestUtil()
val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)