You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/05 23:52:42 UTC
[08/15] flink git commit: [FLINK-6216] [table] Add non-windowed
GroupBy aggregation for streams.
http://git-wip-us.apache.org/repos/asf/flink/blob/8f78824b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
new file mode 100644
index 0000000..2c027a9
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMerge}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.table.GroupWindowAggregationsITCase.TimestampAndWatermarkWithOffset
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase
+import org.apache.flink.table.functions.aggfunctions.CountAggFunction
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+/**
+ * We only test some aggregations until better testing of constructed DataStream
+ * programs is possible.
+ */
+class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
+
+ val data = List(
+ (1L, 1, "Hi"),
+ (2L, 2, "Hello"),
+ (4L, 2, "Hello"),
+ (8L, 3, "Hello world"),
+ (16L, 3, "Hello world"))
+
+ @Test
+ def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setParallelism(1)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env.fromCollection(data)
+ val table = stream.toTable(tEnv, 'long, 'int, 'string, 'proctime.proctime)
+
+ val countFun = new CountAggFunction
+ val weightAvgFun = new WeightedAvg
+
+ val windowedTable = table
+ .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
+ .groupBy('w, 'string)
+ .select('string, countFun('int), 'int.avg,
+ weightAvgFun('long, 'int), weightAvgFun('int, 'int))
+
+ val results = windowedTable.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq("Hello world,1,3,8,3", "Hello world,2,3,12,3", "Hello,1,2,2,2",
+ "Hello,2,2,3,2", "Hi,1,1,1,1")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testEventTimeSessionGroupWindowOverTime(): Unit = {
+ //To verify the "merge" functionality, we create this test with the following characteristics:
+ // 1. set the Parallelism to 1, and have the test data out of order
+ // 2. create a waterMark with 10ms offset to delay the window emission by 10ms
+ val sessionWindowTestdata = List(
+ (1L, 1, "Hello"),
+ (2L, 2, "Hello"),
+ (8L, 8, "Hello"),
+ (9L, 9, "Hello World"),
+ (4L, 4, "Hello"),
+ (16L, 16, "Hello"))
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setParallelism(1)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val countFun = new CountAggFunction
+ val weightAvgFun = new WeightedAvgWithMerge
+
+ val stream = env
+ .fromCollection(sessionWindowTestdata)
+ .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset(10L))
+ val table = stream.toTable(tEnv, 'long, 'int, 'string, 'rowtime.rowtime)
+
+ val windowedTable = table
+ .window(Session withGap 5.milli on 'rowtime as 'w)
+ .groupBy('w, 'string)
+ .select('string, countFun('int), 'int.avg,
+ weightAvgFun('long, 'int), weightAvgFun('int, 'int))
+
+ val results = windowedTable.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq("Hello World,1,9,9,9", "Hello,1,16,16,16", "Hello,4,3,5,5")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setParallelism(1)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env.fromCollection(data)
+ val table = stream.toTable(tEnv, 'long, 'int, 'string, 'proctime.proctime)
+ val countFun = new CountAggFunction
+ val weightAvgFun = new WeightedAvg
+
+ val windowedTable = table
+ .window(Tumble over 2.rows on 'proctime as 'w)
+ .groupBy('w)
+ .select(countFun('string), 'int.avg,
+ weightAvgFun('long, 'int), weightAvgFun('int, 'int))
+
+ val results = windowedTable.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq("2,1,1,1", "2,2,6,2")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testEventTimeTumblingWindow(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env
+ .fromCollection(data)
+ .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset(0L))
+ val table = stream.toTable(tEnv, 'long, 'int, 'string, 'rowtime.rowtime)
+ val countFun = new CountAggFunction
+ val weightAvgFun = new WeightedAvg
+
+ val windowedTable = table
+ .window(Tumble over 5.milli on 'rowtime as 'w)
+ .groupBy('w, 'string)
+ .select('string, countFun('string), 'int.avg, weightAvgFun('long, 'int),
+ weightAvgFun('int, 'int), 'int.min, 'int.max, 'int.sum, 'w.start, 'w.end)
+
+ val results = windowedTable.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq(
+ "Hello world,1,3,8,3,3,3,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01",
+ "Hello world,1,3,16,3,3,3,3,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02",
+ "Hello,2,2,3,2,2,2,4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+ "Hi,1,1,1,1,1,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+}
+
+object GroupWindowAggregationsITCase {
+ class TimestampAndWatermarkWithOffset(
+ offset: Long) extends AssignerWithPunctuatedWatermarks[(Long, Int, String)] {
+
+ override def checkAndGetNextWatermark(
+ lastElement: (Long, Int, String),
+ extractedTimestamp: Long)
+ : Watermark = {
+ new Watermark(extractedTimestamp - offset)
+ }
+
+ override def extractTimestamp(
+ element: (Long, Int, String),
+ previousElementTimestamp: Long): Long = {
+ element._1
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f78824b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
index de6cbfa..0573ff3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
@@ -185,11 +185,11 @@ class GroupWindowTest extends TableTestBase {
.select('string.count)
val expected = unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
unaryNode(
"DataStreamCalc",
unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
unaryNode(
"DataStreamCalc",
streamTableNode(0),
@@ -229,7 +229,7 @@ class GroupWindowTest extends TableTestBase {
.select('string, 'int.count)
val expected = unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
unaryNode(
"DataStreamCalc",
streamTableNode(0),
@@ -259,7 +259,7 @@ class GroupWindowTest extends TableTestBase {
.select('string, 'int.count)
val expected = unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
unaryNode(
"DataStreamCalc",
streamTableNode(0),
@@ -268,10 +268,7 @@ class GroupWindowTest extends TableTestBase {
term("groupBy", "string"),
term(
"window",
- TumblingGroupWindow(
- WindowReference("w"),
- 'proctime,
- 2.rows)),
+ TumblingGroupWindow(WindowReference("w"), 'proctime, 2.rows)),
term("select", "string", "COUNT(int) AS TMP_0")
)
@@ -289,7 +286,7 @@ class GroupWindowTest extends TableTestBase {
.select('string, 'int.count)
val expected = unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
streamTableNode(0),
term("groupBy", "string"),
term(
@@ -317,7 +314,7 @@ class GroupWindowTest extends TableTestBase {
.select('string, weightedAvg('long, 'int))
val expected = unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
streamTableNode(0),
term("groupBy", "string"),
term(
@@ -343,7 +340,7 @@ class GroupWindowTest extends TableTestBase {
.select('string, 'int.count)
val expected = unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
unaryNode(
"DataStreamCalc",
streamTableNode(0),
@@ -374,7 +371,7 @@ class GroupWindowTest extends TableTestBase {
.select('string, 'int.count)
val expected = unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
unaryNode(
"DataStreamCalc",
streamTableNode(0),
@@ -397,6 +394,32 @@ class GroupWindowTest extends TableTestBase {
@Test
def testEventTimeSlidingGroupWindowOverTime(): Unit = {
val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
+
+ val windowedTable = table
+ .window(Slide over 8.milli every 10.milli on 'rowtime as 'w)
+ .groupBy('w, 'string)
+ .select('string, 'int.count)
+
+ val expected = unaryNode(
+ "DataStreamGroupWindowAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "string", "int", "rowtime")
+ ),
+ term("groupBy", "string"),
+ term("window", SlidingGroupWindow(WindowReference("w"), 'rowtime, 8.milli, 10.milli)),
+ term("select", "string", "COUNT(int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+ @Ignore // see comments in DataStreamGroupWindowAggregate
+ def testEventTimeSlidingGroupWindowOverCount(): Unit = {
+ val util = streamTestUtil()
val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
val windowedTable = table
@@ -405,7 +428,7 @@ class GroupWindowTest extends TableTestBase {
.select('string, 'int.count)
val expected = unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
streamTableNode(0),
term("groupBy", "string"),
term(
@@ -434,12 +457,10 @@ class GroupWindowTest extends TableTestBase {
.select('string, weightedAvg('long, 'int))
val expected = unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
streamTableNode(0),
term("groupBy", "string"),
- term(
- "window",
- SlidingGroupWindow(WindowReference("w"), 'rowtime, 8.milli, 10.milli)),
+ term("window", SlidingGroupWindow(WindowReference("w"), 'rowtime, 8.milli, 10.milli)),
term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
)
@@ -457,15 +478,10 @@ class GroupWindowTest extends TableTestBase {
.select('string, 'int.count)
val expected = unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
streamTableNode(0),
term("groupBy", "string"),
- term(
- "window",
- SessionGroupWindow(
- WindowReference("w"),
- 'long,
- 7.milli)),
+ term("window", SessionGroupWindow(WindowReference("w"), 'long, 7.milli)),
term("select", "string", "COUNT(int) AS TMP_0")
)
@@ -485,12 +501,10 @@ class GroupWindowTest extends TableTestBase {
.select('string, weightedAvg('long, 'int))
val expected = unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
streamTableNode(0),
term("groupBy", "string"),
- term(
- "window",
- SessionGroupWindow(WindowReference("w"), 'rowtime, 7.milli)),
+ term("window", SessionGroupWindow(WindowReference("w"), 'rowtime, 7.milli)),
term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
)
@@ -508,7 +522,7 @@ class GroupWindowTest extends TableTestBase {
.select('string, 'int.count)
val expected = unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
unaryNode(
"DataStreamCalc",
streamTableNode(0),
@@ -538,7 +552,7 @@ class GroupWindowTest extends TableTestBase {
.select('int.count)
val expected = unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
unaryNode(
"DataStreamCalc",
streamTableNode(0),
@@ -559,6 +573,31 @@ class GroupWindowTest extends TableTestBase {
@Test
def testAllEventTimeTumblingGroupWindowOverTime(): Unit = {
val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
+
+ val windowedTable = table
+ .window(Tumble over 5.milli on 'rowtime as 'w)
+ .groupBy('w)
+ .select('int.count)
+
+ val expected = unaryNode(
+ "DataStreamGroupWindowAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "int", "rowtime")
+ ),
+ term("window", TumblingGroupWindow(WindowReference("w"), 'rowtime, 5.milli)),
+ term("select", "COUNT(int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+ @Ignore // see comments in DataStreamGroupWindowAggregate
+ def testAllEventTimeTumblingGroupWindowOverCount(): Unit = {
+ val util = streamTestUtil()
val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
val windowedTable = table
@@ -567,7 +606,7 @@ class GroupWindowTest extends TableTestBase {
.select('int.count)
val expected = unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
unaryNode(
"DataStreamCalc",
streamTableNode(0),
@@ -596,7 +635,7 @@ class GroupWindowTest extends TableTestBase {
.select('int.count)
val expected = unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
unaryNode(
"DataStreamCalc",
streamTableNode(0),
@@ -626,7 +665,7 @@ class GroupWindowTest extends TableTestBase {
.select('int.count)
val expected = unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
unaryNode(
"DataStreamCalc",
streamTableNode(0),
@@ -648,6 +687,31 @@ class GroupWindowTest extends TableTestBase {
@Test
def testAllEventTimeSlidingGroupWindowOverTime(): Unit = {
val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
+
+ val windowedTable = table
+ .window(Slide over 8.milli every 10.milli on 'rowtime as 'w)
+ .groupBy('w)
+ .select('int.count)
+
+ val expected = unaryNode(
+ "DataStreamGroupWindowAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "int", "rowtime")
+ ),
+ term("window", SlidingGroupWindow(WindowReference("w"), 'rowtime, 8.milli, 10.milli)),
+ term("select", "COUNT(int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+// @Ignore // see comments in DataStreamGroupWindowAggregate
+ def testAllEventTimeSlidingGroupWindowOverCount(): Unit = {
+ val util = streamTestUtil()
val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
val windowedTable = table
@@ -656,19 +720,13 @@ class GroupWindowTest extends TableTestBase {
.select('int.count)
val expected = unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
unaryNode(
"DataStreamCalc",
streamTableNode(0),
term("select", "int", "long")
),
- term(
- "window",
- SlidingGroupWindow(
- WindowReference("w"),
- 'long,
- 8.milli,
- 10.milli)),
+ term("window", SlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
term("select", "COUNT(int) AS TMP_0")
)
@@ -686,7 +744,7 @@ class GroupWindowTest extends TableTestBase {
.select('int.count)
val expected = unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
unaryNode(
"DataStreamCalc",
streamTableNode(0),
@@ -707,22 +765,22 @@ class GroupWindowTest extends TableTestBase {
@Test
def testTumbleWindowStartEnd(): Unit = {
val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
val windowedTable = table
- .window(Tumble over 5.milli on 'long as 'w)
+ .window(Tumble over 5.milli on 'rowtime as 'w)
.groupBy('w, 'string)
.select('string, 'int.count, 'w.start, 'w.end)
val expected = unaryNode(
- "DataStreamAggregate",
- streamTableNode(0),
+ "DataStreamGroupWindowAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "string", "int", "rowtime")
+ ),
term("groupBy", "string"),
- term("window",
- TumblingGroupWindow(
- WindowReference("w"),
- 'long,
- 5.milli)),
+ term("window", TumblingGroupWindow(WindowReference("w"), 'rowtime, 5.milli)),
term("select",
"string",
"COUNT(int) AS TMP_0",
@@ -736,23 +794,22 @@ class GroupWindowTest extends TableTestBase {
@Test
def testSlideWindowStartEnd(): Unit = {
val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
val windowedTable = table
- .window(Slide over 10.milli every 5.milli on 'long as 'w)
+ .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
.groupBy('w, 'string)
.select('string, 'int.count, 'w.start, 'w.end)
val expected = unaryNode(
- "DataStreamAggregate",
- streamTableNode(0),
+ "DataStreamGroupWindowAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "string", "int", "rowtime")
+ ),
term("groupBy", "string"),
- term("window",
- SlidingGroupWindow(
- WindowReference("w"),
- 'long,
- 10.milli,
- 5.milli)),
+ term("window", SlidingGroupWindow(WindowReference("w"), 'rowtime, 10.milli, 5.milli)),
term("select",
"string",
"COUNT(int) AS TMP_0",
@@ -776,14 +833,10 @@ class GroupWindowTest extends TableTestBase {
val expected = unaryNode(
"DataStreamCalc",
unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
streamTableNode(0),
term("groupBy", "string"),
- term("window",
- SessionGroupWindow(
- WindowReference("w"),
- 'long,
- 3.milli)),
+ term("window", SessionGroupWindow(WindowReference("w"), 'long, 3.milli)),
term("select",
"string",
"COUNT(int) AS TMP_1",
@@ -810,14 +863,10 @@ class GroupWindowTest extends TableTestBase {
val expected = unaryNode(
"DataStreamCalc",
unaryNode(
- "DataStreamAggregate",
+ "DataStreamGroupWindowAggregate",
streamTableNode(0),
term("groupBy", "string"),
- term("window",
- TumblingGroupWindow(
- WindowReference("w"),
- 'long,
- 5.millis)),
+ term("window", TumblingGroupWindow(WindowReference("w"), 'long, 5.millis)),
term("select",
"string",
"SUM(int) AS TMP_0",
http://git-wip-us.apache.org/repos/asf/flink/blob/8f78824b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala
index b6a6660..c72249a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala
@@ -28,13 +28,6 @@ import org.junit.Test
class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase {
@Test(expected = classOf[ValidationException])
- def testSelectWithAggregation(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1.min)
- }
-
- @Test(expected = classOf[ValidationException])
def testDistinct(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)