You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/05 23:52:42 UTC

[08/15] flink git commit: [FLINK-6216] [table] Add non-windowed GroupBy aggregation for streams.

http://git-wip-us.apache.org/repos/asf/flink/blob/8f78824b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
new file mode 100644
index 0000000..2c027a9
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMerge}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.table.GroupWindowAggregationsITCase.TimestampAndWatermarkWithOffset
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase
+import org.apache.flink.table.functions.aggfunctions.CountAggFunction
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+/**
+  * We only test some aggregations until better testing of constructed DataStream
+  * programs is possible.
+  */
+class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
+
+  val data = List(
+    (1L, 1, "Hi"),
+    (2L, 2, "Hello"),
+    (4L, 2, "Hello"),
+    (8L, 3, "Hello world"),
+    (16L, 3, "Hello world"))
+
+  @Test
+  def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env.fromCollection(data)
+    val table = stream.toTable(tEnv, 'long, 'int, 'string, 'proctime.proctime)
+
+    val countFun = new CountAggFunction
+    val weightAvgFun = new WeightedAvg
+
+    val windowedTable = table
+      .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
+      .groupBy('w, 'string)
+      .select('string, countFun('int), 'int.avg,
+              weightAvgFun('long, 'int), weightAvgFun('int, 'int))
+
+    val results = windowedTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq("Hello world,1,3,8,3", "Hello world,2,3,12,3", "Hello,1,2,2,2",
+                       "Hello,2,2,3,2", "Hi,1,1,1,1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testEventTimeSessionGroupWindowOverTime(): Unit = {
+    //To verify the "merge" functionality, we create this test with the following characteristics:
+    // 1. set the Parallelism to 1, and have the test data out of order
+    // 2. create a waterMark with 10ms offset to delay the window emission by 10ms
+    val sessionWindowTestdata = List(
+      (1L, 1, "Hello"),
+      (2L, 2, "Hello"),
+      (8L, 8, "Hello"),
+      (9L, 9, "Hello World"),
+      (4L, 4, "Hello"),
+      (16L, 16, "Hello"))
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val countFun = new CountAggFunction
+    val weightAvgFun = new WeightedAvgWithMerge
+
+    val stream = env
+      .fromCollection(sessionWindowTestdata)
+      .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset(10L))
+    val table = stream.toTable(tEnv, 'long, 'int, 'string, 'rowtime.rowtime)
+
+    val windowedTable = table
+      .window(Session withGap 5.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select('string, countFun('int), 'int.avg,
+              weightAvgFun('long, 'int), weightAvgFun('int, 'int))
+
+    val results = windowedTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq("Hello World,1,9,9,9", "Hello,1,16,16,16", "Hello,4,3,5,5")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env.fromCollection(data)
+    val table = stream.toTable(tEnv, 'long, 'int, 'string, 'proctime.proctime)
+    val countFun = new CountAggFunction
+    val weightAvgFun = new WeightedAvg
+
+    val windowedTable = table
+      .window(Tumble over 2.rows on 'proctime as 'w)
+      .groupBy('w)
+      .select(countFun('string), 'int.avg,
+              weightAvgFun('long, 'int), weightAvgFun('int, 'int))
+
+    val results = windowedTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq("2,1,1,1", "2,2,6,2")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testEventTimeTumblingWindow(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset(0L))
+    val table = stream.toTable(tEnv, 'long, 'int, 'string, 'rowtime.rowtime)
+    val countFun = new CountAggFunction
+    val weightAvgFun = new WeightedAvg
+
+    val windowedTable = table
+      .window(Tumble over 5.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select('string, countFun('string), 'int.avg, weightAvgFun('long, 'int),
+              weightAvgFun('int, 'int), 'int.min, 'int.max, 'int.sum, 'w.start, 'w.end)
+
+    val results = windowedTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq(
+      "Hello world,1,3,8,3,3,3,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01",
+      "Hello world,1,3,16,3,3,3,3,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02",
+      "Hello,2,2,3,2,2,2,4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+      "Hi,1,1,1,1,1,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+}
+
+object GroupWindowAggregationsITCase {
+  class TimestampAndWatermarkWithOffset(
+    offset: Long) extends AssignerWithPunctuatedWatermarks[(Long, Int, String)] {
+
+    override def checkAndGetNextWatermark(
+        lastElement: (Long, Int, String),
+        extractedTimestamp: Long)
+      : Watermark = {
+      new Watermark(extractedTimestamp - offset)
+    }
+
+    override def extractTimestamp(
+        element: (Long, Int, String),
+        previousElementTimestamp: Long): Long = {
+      element._1
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f78824b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
index de6cbfa..0573ff3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
@@ -185,11 +185,11 @@ class GroupWindowTest extends TableTestBase {
       .select('string.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       unaryNode(
         "DataStreamCalc",
         unaryNode(
-          "DataStreamAggregate",
+          "DataStreamGroupWindowAggregate",
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
@@ -229,7 +229,7 @@ class GroupWindowTest extends TableTestBase {
       .select('string, 'int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
@@ -259,7 +259,7 @@ class GroupWindowTest extends TableTestBase {
       .select('string, 'int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
@@ -268,10 +268,7 @@ class GroupWindowTest extends TableTestBase {
       term("groupBy", "string"),
       term(
         "window",
-        TumblingGroupWindow(
-          WindowReference("w"),
-          'proctime,
-          2.rows)),
+        TumblingGroupWindow(WindowReference("w"), 'proctime, 2.rows)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -289,7 +286,7 @@ class GroupWindowTest extends TableTestBase {
       .select('string, 'int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       streamTableNode(0),
       term("groupBy", "string"),
       term(
@@ -317,7 +314,7 @@ class GroupWindowTest extends TableTestBase {
       .select('string, weightedAvg('long, 'int))
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       streamTableNode(0),
       term("groupBy", "string"),
       term(
@@ -343,7 +340,7 @@ class GroupWindowTest extends TableTestBase {
       .select('string, 'int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
@@ -374,7 +371,7 @@ class GroupWindowTest extends TableTestBase {
       .select('string, 'int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
@@ -397,6 +394,32 @@ class GroupWindowTest extends TableTestBase {
   @Test
   def testEventTimeSlidingGroupWindowOverTime(): Unit = {
     val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
+
+    val windowedTable = table
+      .window(Slide over 8.milli every 10.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "string", "int", "rowtime")
+      ),
+      term("groupBy", "string"),
+      term("window", SlidingGroupWindow(WindowReference("w"), 'rowtime, 8.milli, 10.milli)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  @Ignore // see comments in DataStreamGroupWindowAggregate
+  def testEventTimeSlidingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
     val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
 
     val windowedTable = table
@@ -405,7 +428,7 @@ class GroupWindowTest extends TableTestBase {
       .select('string, 'int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       streamTableNode(0),
       term("groupBy", "string"),
       term(
@@ -434,12 +457,10 @@ class GroupWindowTest extends TableTestBase {
       .select('string, weightedAvg('long, 'int))
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       streamTableNode(0),
       term("groupBy", "string"),
-      term(
-        "window",
-        SlidingGroupWindow(WindowReference("w"), 'rowtime, 8.milli, 10.milli)),
+      term("window", SlidingGroupWindow(WindowReference("w"), 'rowtime, 8.milli, 10.milli)),
       term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
     )
 
@@ -457,15 +478,10 @@ class GroupWindowTest extends TableTestBase {
       .select('string, 'int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       streamTableNode(0),
       term("groupBy", "string"),
-      term(
-        "window",
-        SessionGroupWindow(
-          WindowReference("w"),
-          'long,
-          7.milli)),
+      term("window", SessionGroupWindow(WindowReference("w"), 'long, 7.milli)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -485,12 +501,10 @@ class GroupWindowTest extends TableTestBase {
       .select('string, weightedAvg('long, 'int))
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       streamTableNode(0),
       term("groupBy", "string"),
-      term(
-        "window",
-        SessionGroupWindow(WindowReference("w"), 'rowtime, 7.milli)),
+      term("window", SessionGroupWindow(WindowReference("w"), 'rowtime, 7.milli)),
       term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
     )
 
@@ -508,7 +522,7 @@ class GroupWindowTest extends TableTestBase {
       .select('string, 'int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
@@ -538,7 +552,7 @@ class GroupWindowTest extends TableTestBase {
       .select('int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
@@ -559,6 +573,31 @@ class GroupWindowTest extends TableTestBase {
   @Test
   def testAllEventTimeTumblingGroupWindowOverTime(): Unit = {
     val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
+
+    val windowedTable = table
+      .window(Tumble over 5.milli on 'rowtime as 'w)
+      .groupBy('w)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "int", "rowtime")
+      ),
+      term("window", TumblingGroupWindow(WindowReference("w"), 'rowtime, 5.milli)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  @Ignore // see comments in DataStreamGroupWindowAggregate
+  def testAllEventTimeTumblingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
     val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
 
     val windowedTable = table
@@ -567,7 +606,7 @@ class GroupWindowTest extends TableTestBase {
       .select('int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
@@ -596,7 +635,7 @@ class GroupWindowTest extends TableTestBase {
       .select('int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
@@ -626,7 +665,7 @@ class GroupWindowTest extends TableTestBase {
       .select('int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
@@ -648,6 +687,31 @@ class GroupWindowTest extends TableTestBase {
   @Test
   def testAllEventTimeSlidingGroupWindowOverTime(): Unit = {
     val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
+
+    val windowedTable = table
+      .window(Slide over 8.milli every 10.milli on 'rowtime as 'w)
+      .groupBy('w)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "int", "rowtime")
+      ),
+      term("window", SlidingGroupWindow(WindowReference("w"), 'rowtime, 8.milli, 10.milli)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+//  @Ignore // see comments in DataStreamGroupWindowAggregate
+  def testAllEventTimeSlidingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
     val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
 
     val windowedTable = table
@@ -656,19 +720,13 @@ class GroupWindowTest extends TableTestBase {
       .select('int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
         term("select", "int", "long")
       ),
-      term(
-        "window",
-        SlidingGroupWindow(
-          WindowReference("w"),
-          'long,
-          8.milli,
-          10.milli)),
+      term("window", SlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
       term("select", "COUNT(int) AS TMP_0")
     )
 
@@ -686,7 +744,7 @@ class GroupWindowTest extends TableTestBase {
       .select('int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
@@ -707,22 +765,22 @@ class GroupWindowTest extends TableTestBase {
   @Test
   def testTumbleWindowStartEnd(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
 
     val windowedTable = table
-      .window(Tumble over 5.milli on 'long as 'w)
+      .window(Tumble over 5.milli on 'rowtime as 'w)
       .groupBy('w, 'string)
       .select('string, 'int.count, 'w.start, 'w.end)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
-      streamTableNode(0),
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "string", "int", "rowtime")
+      ),
       term("groupBy", "string"),
-      term("window",
-        TumblingGroupWindow(
-          WindowReference("w"),
-          'long,
-          5.milli)),
+      term("window", TumblingGroupWindow(WindowReference("w"), 'rowtime, 5.milli)),
       term("select",
         "string",
         "COUNT(int) AS TMP_0",
@@ -736,23 +794,22 @@ class GroupWindowTest extends TableTestBase {
   @Test
   def testSlideWindowStartEnd(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
 
     val windowedTable = table
-      .window(Slide over 10.milli every 5.milli on 'long as 'w)
+      .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
       .groupBy('w, 'string)
       .select('string, 'int.count, 'w.start, 'w.end)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
-      streamTableNode(0),
+      "DataStreamGroupWindowAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "string", "int", "rowtime")
+      ),
       term("groupBy", "string"),
-      term("window",
-        SlidingGroupWindow(
-          WindowReference("w"),
-          'long,
-          10.milli,
-          5.milli)),
+      term("window", SlidingGroupWindow(WindowReference("w"), 'rowtime, 10.milli, 5.milli)),
       term("select",
         "string",
         "COUNT(int) AS TMP_0",
@@ -776,14 +833,10 @@ class GroupWindowTest extends TableTestBase {
     val expected = unaryNode(
       "DataStreamCalc",
       unaryNode(
-        "DataStreamAggregate",
+        "DataStreamGroupWindowAggregate",
         streamTableNode(0),
         term("groupBy", "string"),
-        term("window",
-          SessionGroupWindow(
-            WindowReference("w"),
-            'long,
-            3.milli)),
+        term("window", SessionGroupWindow(WindowReference("w"), 'long, 3.milli)),
         term("select",
           "string",
           "COUNT(int) AS TMP_1",
@@ -810,14 +863,10 @@ class GroupWindowTest extends TableTestBase {
     val expected = unaryNode(
       "DataStreamCalc",
       unaryNode(
-        "DataStreamAggregate",
+        "DataStreamGroupWindowAggregate",
         streamTableNode(0),
         term("groupBy", "string"),
-        term("window",
-          TumblingGroupWindow(
-            WindowReference("w"),
-            'long,
-            5.millis)),
+        term("window", TumblingGroupWindow(WindowReference("w"), 'long, 5.millis)),
         term("select",
           "string",
           "SUM(int) AS TMP_0",

http://git-wip-us.apache.org/repos/asf/flink/blob/8f78824b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala
index b6a6660..c72249a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala
@@ -28,13 +28,6 @@ import org.junit.Test
 class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase {
 
   @Test(expected = classOf[ValidationException])
-  def testSelectWithAggregation(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1.min)
-  }
-
-  @Test(expected = classOf[ValidationException])
   def testDistinct(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)