You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/09 16:50:57 UTC

[4/4] flink git commit: [FLINK-6479] [table] Fix IOOBE in DataStreamGroupWindowAggregate.

[FLINK-6479] [table] Fix IOOBE in DataStreamGroupWindowAggregate.

This closes #3841.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f26a9116
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f26a9116
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f26a9116

Branch: refs/heads/master
Commit: f26a911627204519a78ccc57b1f12b387d85e43b
Parents: e0ab5f5
Author: sunjincheng121 <su...@gmail.com>
Authored: Mon May 8 16:41:31 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue May 9 18:50:20 2017 +0200

----------------------------------------------------------------------
 .../DataStreamGroupWindowAggregate.scala        |  2 +-
 .../table/GroupWindowAggregationsITCase.scala   | 32 +++++++++++++++
 .../scala/stream/table/GroupWindowTest.scala    | 41 +++++++++++++++++++-
 3 files changed, 73 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f26a9116/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index 1be1896..c38e5af 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -134,7 +134,7 @@ class DataStreamGroupWindowAggregate(
       namedAggregates,
       namedProperties)
 
-    val keyedAggOpName = s"groupBy: (${groupingToString(schema.logicalType, grouping)}), " +
+    val keyedAggOpName = s"groupBy: (${groupingToString(inputSchema.logicalType, grouping)}), " +
       s"window: ($window), " +
       s"select: ($aggString)"
     val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"

http://git-wip-us.apache.org/repos/asf/flink/blob/f26a9116/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
index 2c027a9..846fe3e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
@@ -175,6 +175,38 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
       "Hi,1,1,1,1,1,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testGroupWindowWithoutKeyInProjection(): Unit = {
+    val data = List(
+      (1L, 1, "Hi", 1, 1),
+      (2L, 2, "Hello", 2, 2),
+      (4L, 2, "Hello", 2, 2),
+      (8L, 3, "Hello world", 3, 3),
+      (16L, 3, "Hello world", 3, 3))
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env.fromCollection(data)
+    val table = stream.toTable(tEnv, 'long, 'int, 'string, 'int2, 'int3, 'proctime.proctime)
+
+    val weightAvgFun = new WeightedAvg
+
+    val windowedTable = table
+      .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
+      .groupBy('w, 'int2, 'int3, 'string)
+      .select(weightAvgFun('long, 'int))
+
+    val results = windowedTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq("12", "8", "2", "3", "1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
 }
 
 object GroupWindowAggregationsITCase {

http://git-wip-us.apache.org/repos/asf/flink/blob/f26a9116/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
index 0573ff3..ef071b7 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.api.scala.stream.table
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithMerge
+import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMerge}
 import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.expressions.WindowReference
@@ -792,6 +792,45 @@ class GroupWindowTest extends TableTestBase {
   }
 
   @Test
+  def testSlidingWindowWithUDAF(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String, Int, Int)](
+      'long,
+      'int,
+      'string,
+      'int2,
+      'int3,
+      'proctime.proctime)
+
+    val weightAvgFun = new WeightedAvg
+
+    val windowedTable = table
+      .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
+      .groupBy('w, 'int2, 'int3, 'string)
+      .select(weightAvgFun('long, 'int))
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupWindowAggregate",
+          streamTableNode(0),
+          term("groupBy", "string, int2, int3"),
+          term("window", SlidingGroupWindow(WindowReference("w"), 'proctime,  2.rows, 1.rows)),
+          term(
+            "select",
+            "string",
+            "int2",
+            "int3",
+            "WeightedAvg(long, int) AS TMP_0")
+        ),
+        term("select","TMP_0")
+      )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
   def testSlideWindowStartEnd(): Unit = {
     val util = streamTestUtil()
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)