You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:39 UTC
[10/47] flink git commit: [FLINK-4704] [table] Refactor package
structure of flink-table.
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/CalcITCase.scala
deleted file mode 100644
index 3eee4d4..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/CalcITCase.scala
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.stream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.stream.utils.{StreamITCase, StreamTestData}
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.expressions.Literal
-import org.apache.flink.api.table.{TableEnvironment, TableException}
-import org.apache.flink.types.Row
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-class CalcITCase extends StreamingMultipleProgramsTestBase {
-
- @Test
- def testSimpleSelectAll(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1, '_2, '_3)
-
- val results = ds.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = mutable.MutableList(
- "1,1,Hi",
- "2,2,Hello",
- "3,2,Hello world")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testSelectFirst(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1)
-
- val results = ds.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = mutable.MutableList("1", "2", "3")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testSimpleSelectWithNaming(): Unit = {
-
- // verify ProjectMergeRule.
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv)
- .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
- .select('a, 'b)
-
- val results = ds.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = mutable.MutableList(
- "1,1", "2,2", "3,2", "4,3", "5,3", "6,3", "7,4",
- "8,4", "9,4", "10,4", "11,5", "12,5", "13,5", "14,5", "15,5",
- "16,6", "17,6", "18,6", "19,6", "20,6", "21,6")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testSimpleSelectAllWithAs(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
- .select('a, 'b, 'c)
-
- val results = ds.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = mutable.MutableList(
- "1,1,Hi",
- "2,2,Hello",
- "3,2,Hello world")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test(expected = classOf[TableException])
- def testAsWithToFewFields(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b)
-
- val results = ds.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = mutable.MutableList("no")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test(expected = classOf[TableException])
- def testAsWithToManyFields(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd)
-
- val results = ds.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = mutable.MutableList("no")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test(expected = classOf[TableException])
- def testAsWithAmbiguousFields(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'b)
-
- val results = ds.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = mutable.MutableList("no")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
-
- @Test(expected = classOf[TableException])
- def testOnlyFieldRefInAs(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b as 'c, 'd)
-
- val results = ds.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = mutable.MutableList("no")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testSimpleFilter(): Unit = {
- /*
- * Test simple filter
- */
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
- val filterDs = ds.filter('a === 3)
- val results = filterDs.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = mutable.MutableList("3,2,Hello world")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testAllRejectingFilter(): Unit = {
- /*
- * Test all-rejecting filter
- */
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
- val filterDs = ds.filter( Literal(false) )
- val results = filterDs.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
-
- assertEquals(true, StreamITCase.testResults.isEmpty)
- }
-
- @Test
- def testAllPassingFilter(): Unit = {
- /*
- * Test all-passing filter
- */
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
- val filterDs = ds.filter( Literal(true) )
- val results = filterDs.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = mutable.MutableList(
- "1,1,Hi",
- "2,2,Hello",
- "3,2,Hello world")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testFilterOnIntegerTupleField(): Unit = {
- /*
- * Test filter on Integer tuple field.
- */
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
- val filterDs = ds.filter( 'a % 2 === 0 )
- val results = filterDs.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = mutable.MutableList(
- "2,2,Hello", "4,3,Hello world, how are you?",
- "6,3,Luke Skywalker", "8,4,Comment#2", "10,4,Comment#4",
- "12,5,Comment#6", "14,5,Comment#8", "16,6,Comment#10",
- "18,6,Comment#12", "20,6,Comment#14")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testNotEquals(): Unit = {
- /*
- * Test filter on Integer tuple field.
- */
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
- val filterDs = ds.filter( 'a % 2 !== 0)
- val results = filterDs.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
- val expected = mutable.MutableList(
- "1,1,Hi", "3,2,Hello world",
- "5,3,I am fine.", "7,4,Comment#1", "9,4,Comment#3",
- "11,5,Comment#5", "13,5,Comment#7", "15,5,Comment#9",
- "17,6,Comment#11", "19,6,Comment#13", "21,6,Comment#15")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowTest.scala
deleted file mode 100644
index 9c2d6b3..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowTest.scala
+++ /dev/null
@@ -1,734 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.stream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table._
-import org.apache.flink.api.table.expressions.{RowtimeAttribute, WindowReference}
-import org.apache.flink.api.table.plan.logical._
-import org.apache.flink.api.table.utils.TableTestBase
-import org.apache.flink.api.table.utils.TableTestUtil.{streamTableNode, term, unaryNode}
-import org.junit.{Ignore, Test}
-
-class GroupWindowTest extends TableTestBase {
-
- // batch windows are not supported yet
- @Test(expected = classOf[ValidationException])
- def testInvalidBatchWindow(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- table
- .groupBy('string)
- .window(Session withGap 100.milli as 'string)
- }
-
- @Test(expected = classOf[ValidationException])
- def testInvalidWindowProperty(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- table
- .groupBy('string)
- .select('string, 'string.start) // property in non windowed table
- }
-
- @Test(expected = classOf[TableException])
- def testInvalidRowtime1(): Unit = {
- val util = streamTestUtil()
- // rowtime attribute must not be a field name
- util.addTable[(Long, Int, String)]('rowtime, 'long, 'int, 'string)
- }
-
- @Test(expected = classOf[ValidationException])
- def testInvalidRowtime2(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- table
- .select('string, 'int as 'rowtime) // rowtime attribute must not be an alias
- }
-
- @Test(expected = classOf[ValidationException])
- def testInvalidRowtime3(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- table.as('rowtime, 'myint, 'mystring) // rowtime attribute must not be an alias
- }
-
- @Test(expected = classOf[ValidationException])
- def testInvalidRowtime4(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- table
- .groupBy('string)
- // only rowtime is a valid time attribute in a stream environment
- .window(Tumble over 50.milli on 'string)
- .select('string, 'int.count)
- }
-
- @Test(expected = classOf[ValidationException])
- def testInvalidTumblingSize(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- table
- .groupBy('string)
- .window(Tumble over "WRONG") // string is not a valid interval
- .select('string, 'int.count)
- }
-
- @Test(expected = classOf[ValidationException])
- def testInvalidSlidingSize(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- table
- .groupBy('string)
- .window(Slide over "WRONG" every "WRONG") // string is not a valid interval
- .select('string, 'int.count)
- }
-
- @Test(expected = classOf[ValidationException])
- def testInvalidSlidingSlide(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- table
- .groupBy('string)
- .window(Slide over 12.rows every 1.minute) // row and time intervals may not be mixed
- .select('string, 'int.count)
- }
-
- @Test(expected = classOf[ValidationException])
- def testInvalidSessionGap(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- table
- .groupBy('string)
- .window(Session withGap 10.rows) // row interval is not valid for session windows
- .select('string, 'int.count)
- }
-
- @Test(expected = classOf[ValidationException])
- def testInvalidWindowAlias1(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- table
- .groupBy('string)
- .window(Session withGap 100.milli as 1 + 1) // expression instead of a symbol
- .select('string, 'int.count)
- }
-
- @Test(expected = classOf[ValidationException])
- def testInvalidWindowAlias2(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- table
- .groupBy('string)
- .window(Session withGap 100.milli as 'string) // field name "string" is already present
- .select('string, 'int.count)
- }
-
- @Test
- def testProcessingTimeTumblingGroupWindowOverTime(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .groupBy('string)
- .window(Tumble over 50.milli)
- .select('string, 'int.count)
-
- val expected = unaryNode(
- "DataStreamAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "string", "int")
- ),
- term("groupBy", "string"),
- term("window", ProcessingTimeTumblingGroupWindow(None, 50.milli)),
- term("select", "string", "COUNT(int) AS TMP_0")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
- @Test
- def testProcessingTimeTumblingGroupWindowOverCount(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .groupBy('string)
- .window(Tumble over 2.rows)
- .select('string, 'int.count)
-
- val expected = unaryNode(
- "DataStreamAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "string", "int")
- ),
- term("groupBy", "string"),
- term("window", ProcessingTimeTumblingGroupWindow(None, 2.rows)),
- term("select", "string", "COUNT(int) AS TMP_0")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
- @Test
- def testEventTimeTumblingGroupWindowOverTime(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .groupBy('string)
- .window(Tumble over 5.milli on 'rowtime)
- .select('string, 'int.count)
-
- val expected = unaryNode(
- "DataStreamAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "string", "int")
- ),
- term("groupBy", "string"),
- term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 5.milli)),
- term("select", "string", "COUNT(int) AS TMP_0")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
- @Test
- @Ignore // see comments in DataStreamAggregate
- def testEventTimeTumblingGroupWindowOverCount(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .groupBy('string)
- .window(Tumble over 2.rows on 'rowtime)
- .select('string, 'int.count)
-
- val expected = unaryNode(
- "DataStreamAggregate",
- streamTableNode(0),
- term("groupBy", "string"),
- term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 2.rows)),
- term("select", "string", "COUNT(int) AS TMP_0")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
- @Test
- def testProcessingTimeSlidingGroupWindowOverTime(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .groupBy('string)
- .window(Slide over 50.milli every 50.milli)
- .select('string, 'int.count)
-
- val expected = unaryNode(
- "DataStreamAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "string", "int")
- ),
- term("groupBy", "string"),
- term("window", ProcessingTimeSlidingGroupWindow(None, 50.milli, 50.milli)),
- term("select", "string", "COUNT(int) AS TMP_0")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
- @Test
- def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .groupBy('string)
- .window(Slide over 2.rows every 1.rows)
- .select('string, 'int.count)
-
- val expected = unaryNode(
- "DataStreamAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "string", "int")
- ),
- term("groupBy", "string"),
- term("window", ProcessingTimeSlidingGroupWindow(None, 2.rows, 1.rows)),
- term("select", "string", "COUNT(int) AS TMP_0")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
- @Test
- def testEventTimeSlidingGroupWindowOverTime(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .groupBy('string)
- .window(Slide over 8.milli every 10.milli on 'rowtime)
- .select('string, 'int.count)
-
- val expected = unaryNode(
- "DataStreamAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "string", "int")
- ),
- term("groupBy", "string"),
- term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 8.milli, 10.milli)),
- term("select", "string", "COUNT(int) AS TMP_0")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
- @Test
- @Ignore // see comments in DataStreamAggregate
- def testEventTimeSlidingGroupWindowOverCount(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .groupBy('string)
- .window(Slide over 2.rows every 1.rows on 'rowtime)
- .select('string, 'int.count)
-
- val expected = unaryNode(
- "DataStreamAggregate",
- streamTableNode(0),
- term("groupBy", "string"),
- term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 2.rows, 1.rows)),
- term("select", "string", "COUNT(int) AS TMP_0")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
- @Test
- def testEventTimeSessionGroupWindowOverTime(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .groupBy('string)
- .window(Session withGap 7.milli on 'rowtime)
- .select('string, 'int.count)
-
- val expected = unaryNode(
- "DataStreamAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "string", "int")
- ),
- term("groupBy", "string"),
- term("window", EventTimeSessionGroupWindow(None, RowtimeAttribute(), 7.milli)),
- term("select", "string", "COUNT(int) AS TMP_0")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
- @Test
- def testAllProcessingTimeTumblingGroupWindowOverTime(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .groupBy('string)
- .window(Tumble over 50.milli)
- .select('string, 'int.count)
-
- val expected = unaryNode(
- "DataStreamAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "string", "int")
- ),
- term("groupBy", "string"),
- term("window", ProcessingTimeTumblingGroupWindow(None, 50.milli)),
- term("select", "string", "COUNT(int) AS TMP_0")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
- @Test
- def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .window(Tumble over 2.rows)
- .select('int.count)
-
- val expected = unaryNode(
- "DataStreamAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "int")
- ),
- term("window", ProcessingTimeTumblingGroupWindow(None, 2.rows)),
- term("select", "COUNT(int) AS TMP_0")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
- @Test
- def testAllEventTimeTumblingGroupWindowOverTime(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .window(Tumble over 5.milli on 'rowtime)
- .select('int.count)
-
- val expected = unaryNode(
- "DataStreamAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "int")
- ),
- term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 5.milli)),
- term("select", "COUNT(int) AS TMP_0")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
- @Test
- @Ignore // see comments in DataStreamAggregate
- def testAllEventTimeTumblingGroupWindowOverCount(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .window(Tumble over 2.rows on 'rowtime)
- .select('int.count)
-
- val expected = unaryNode(
- "DataStreamAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "int")
- ),
- term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 2.rows)),
- term("select", "COUNT(int) AS TMP_0")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
-
- @Test
- def testAllProcessingTimeSlidingGroupWindowOverTime(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .window(Slide over 50.milli every 50.milli)
- .select('int.count)
-
- val expected = unaryNode(
- "DataStreamAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "int")
- ),
- term("window", ProcessingTimeSlidingGroupWindow(None, 50.milli, 50.milli)),
- term("select", "COUNT(int) AS TMP_0")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
- @Test
- def testAllProcessingTimeSlidingGroupWindowOverCount(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .window(Slide over 2.rows every 1.rows)
- .select('int.count)
-
- val expected = unaryNode(
- "DataStreamAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "int")
- ),
- term("window", ProcessingTimeSlidingGroupWindow(None, 2.rows, 1.rows)),
- term("select", "COUNT(int) AS TMP_0")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
- @Test
- def testAllEventTimeSlidingGroupWindowOverTime(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .window(Slide over 8.milli every 10.milli on 'rowtime)
- .select('int.count)
-
- val expected = unaryNode(
- "DataStreamAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "int")
- ),
- term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 8.milli, 10.milli)),
- term("select", "COUNT(int) AS TMP_0")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
- @Test
- @Ignore // see comments in DataStreamAggregate
- def testAllEventTimeSlidingGroupWindowOverCount(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .window(Slide over 2.rows every 1.rows on 'rowtime)
- .select('int.count)
-
- val expected = unaryNode(
- "DataStreamAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "int")
- ),
- term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 2.rows, 1.rows)),
- term("select", "COUNT(int) AS TMP_0")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
- @Test
- def testAllEventTimeSessionGroupWindowOverTime(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .window(Session withGap 7.milli on 'rowtime)
- .select('int.count)
-
- val expected = unaryNode(
- "DataStreamAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "int")
- ),
- term("window", EventTimeSessionGroupWindow(None, RowtimeAttribute(), 7.milli)),
- term("select", "COUNT(int) AS TMP_0")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
- @Test
- def testTumbleWindowStartEnd(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .groupBy('string)
- .window(Tumble over 5.milli on 'rowtime as 'w)
- .select('string, 'int.count, 'w.start, 'w.end)
-
- val expected = unaryNode(
- "DataStreamAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "string", "int")
- ),
- term("groupBy", "string"),
- term("window",
- EventTimeTumblingGroupWindow(
- Some(WindowReference("w")),
- RowtimeAttribute(),
- 5.milli)),
- term("select",
- "string",
- "COUNT(int) AS TMP_0",
- "start(WindowReference(w)) AS TMP_1",
- "end(WindowReference(w)) AS TMP_2")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
- @Test
- def testSlideWindowStartEnd(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .groupBy('string)
- .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
- .select('string, 'int.count, 'w.start, 'w.end)
-
- val expected = unaryNode(
- "DataStreamAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "string", "int")
- ),
- term("groupBy", "string"),
- term("window",
- EventTimeSlidingGroupWindow(
- Some(WindowReference("w")),
- RowtimeAttribute(),
- 10.milli,
- 5.milli)),
- term("select",
- "string",
- "COUNT(int) AS TMP_0",
- "start(WindowReference(w)) AS TMP_1",
- "end(WindowReference(w)) AS TMP_2")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
- @Test
- def testSessionWindowStartWithTwoEnd(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .groupBy('string)
- .window(Session withGap 3.milli on 'rowtime as 'w)
- .select('w.end as 'we1, 'string, 'int.count as 'cnt, 'w.start as 'ws, 'w.end as 'we2)
-
- val expected = unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "string", "int")
- ),
- term("groupBy", "string"),
- term("window",
- EventTimeSessionGroupWindow(
- Some(WindowReference("w")),
- RowtimeAttribute(),
- 3.milli)),
- term("select",
- "string",
- "COUNT(int) AS TMP_1",
- "end(WindowReference(w)) AS TMP_0",
- "start(WindowReference(w)) AS TMP_2")
- ),
- term("select", "TMP_0 AS we1", "string", "TMP_1 AS cnt", "TMP_2 AS ws", "TMP_0 AS we2")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-
- @Test
- def testTumbleWindowWithDuplicateAggsAndProps(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .groupBy('string)
- .window(Tumble over 5.millis on 'rowtime as 'w)
- .select('string, 'int.sum + 1 as 's1, 'int.sum + 3 as 's2, 'w.start as 'x, 'w.start as 'x2,
- 'w.end as 'x3, 'w.end)
-
- val expected = unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "string", "int")
- ),
- term("groupBy", "string"),
- term("window",
- EventTimeTumblingGroupWindow(
- Some(WindowReference("w")),
- RowtimeAttribute(),
- 5.millis)),
- term("select",
- "string",
- "SUM(int) AS TMP_0",
- "start(WindowReference(w)) AS TMP_1",
- "end(WindowReference(w)) AS TMP_2")
- ),
- term("select",
- "string",
- "+(CAST(AS(TMP_0, 'TMP_3')), CAST(1)) AS s1",
- "+(CAST(AS(TMP_0, 'TMP_4')), CAST(3)) AS s2",
- "TMP_1 AS x",
- "TMP_1 AS x2",
- "TMP_2 AS x3",
- "TMP_2 AS TMP_5")
- )
-
- util.verifyTable(windowedTable, expected)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnionITCase.scala
deleted file mode 100644
index 5096b53..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnionITCase.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.stream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.stream.utils.{StreamITCase, StreamTestData}
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.{TableEnvironment, ValidationException}
-import org.apache.flink.types.Row
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-class UnionITCase extends StreamingMultipleProgramsTestBase {
-
- @Test
- def testUnion(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- StreamITCase.testResults = mutable.MutableList()
- val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f)
-
- val unionDs = ds1.unionAll(ds2).select('c)
-
- val results = unionDs.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = mutable.MutableList(
- "Hi", "Hello", "Hello world", "Hi", "Hello", "Hello world")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testUnionWithFilter(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- StreamITCase.testResults = mutable.MutableList()
- val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
-
- val unionDs = ds1.unionAll(ds2.select('a, 'b, 'c)).filter('b < 2).select('c)
-
- val results = unionDs.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = mutable.MutableList("Hi", "Hallo")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test(expected = classOf[ValidationException])
- def testUnionFieldsNameNotOverlap1(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- StreamITCase.testResults = mutable.MutableList()
- val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
-
- val unionDs = ds1.unionAll(ds2)
-
- val results = unionDs.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
-
- assertEquals(true, StreamITCase.testResults.isEmpty)
- }
-
- @Test(expected = classOf[ValidationException])
- def testUnionFieldsNameNotOverlap2(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- StreamITCase.testResults = mutable.MutableList()
- val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
- .select('a, 'b, 'c)
-
- val unionDs = ds1.unionAll(ds2)
-
- val results = unionDs.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
-
- println(StreamITCase.testResults)
- assertEquals(true, StreamITCase.testResults.isEmpty)
- }
-
- @Test(expected = classOf[ValidationException])
- def testUnionTablesFromDifferentEnvs(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv1 = TableEnvironment.getTableEnvironment(env)
- val tEnv2 = TableEnvironment.getTableEnvironment(env)
-
- val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv1, 'a, 'b, 'c)
- val ds2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv2, 'a, 'b, 'c)
-
- // Must fail. Tables are bound to different TableEnvironments.
- ds1.unionAll(ds2)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
deleted file mode 100644
index 6d1a62e..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.stream.table
-
-import org.apache.flink.api.scala.stream.utils.StreamTestData
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.{TableEnvironment, ValidationException}
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Test
-
-class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase {
-
- @Test(expected = classOf[ValidationException])
- def testSelectWithAggregation(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1.min)
- }
-
- @Test(expected = classOf[ValidationException])
- def testDistinct(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).distinct()
- }
-
- @Test(expected = classOf[ValidationException])
- def testSort(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).orderBy('_1.desc)
- }
-
- @Test(expected = classOf[ValidationException])
- def testJoin(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
- val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
- t1.join(t2)
- }
-
- @Test(expected = classOf[ValidationException])
- def testUnion(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
- val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
- t1.union(t2)
- }
-
- @Test(expected = classOf[ValidationException])
- def testIntersect(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
- val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
- t1.intersect(t2)
- }
-
- @Test(expected = classOf[ValidationException])
- def testIntersectAll(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
- val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
- t1.intersectAll(t2)
- }
-
- @Test(expected = classOf[ValidationException])
- def testMinus(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
- val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
- t1.minus(t2)
- }
-
- @Test(expected = classOf[ValidationException])
- def testMinusAll(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
- val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
- t1.minusAll(t2)
- }
-
- @Test(expected = classOf[ValidationException])
- def testLimit(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
- t1.limit(0,5)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala
deleted file mode 100644
index 305f1db..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala
+++ /dev/null
@@ -1,385 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.stream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.types.Row
-import org.apache.flink.api.table.expressions.utils._
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.table.{TableEnvironment, TableException, Types, ValidationException}
-import org.apache.flink.api.table.utils.TableTestUtil._
-import org.apache.flink.api.table.utils._
-import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
-import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaExecutionEnv}
-import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment => ScalaExecutionEnv}
-import org.junit.Assert.{assertTrue, fail}
-import org.junit.Test
-import org.mockito.Mockito._
-
-class UserDefinedTableFunctionTest extends TableTestBase {
-
- @Test
- def testJavaScalaTableAPIEquality(): Unit = {
- // mock
- val ds = mock(classOf[DataStream[Row]])
- val jDs = mock(classOf[JDataStream[Row]])
- val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*)
- when(ds.javaStream).thenReturn(jDs)
- when(jDs.getType).thenReturn(typeInfo)
-
- // Scala environment
- val env = mock(classOf[ScalaExecutionEnv])
- val tableEnv = TableEnvironment.getTableEnvironment(env)
- val in1 = ds.toTable(tableEnv).as('a, 'b, 'c)
-
- // Java environment
- val javaEnv = mock(classOf[JavaExecutionEnv])
- val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv)
- val in2 = javaTableEnv.fromDataStream(jDs).as("a, b, c")
-
- // test cross join
- val func1 = new TableFunc1
- javaTableEnv.registerFunction("func1", func1)
- var scalaTable = in1.join(func1('c) as 's).select('c, 's)
- var javaTable = in2.join("func1(c).as(s)").select("c, s")
- verifyTableEquals(scalaTable, javaTable)
-
- // test left outer join
- scalaTable = in1.leftOuterJoin(func1('c) as 's).select('c, 's)
- javaTable = in2.leftOuterJoin("as(func1(c), s)").select("c, s")
- verifyTableEquals(scalaTable, javaTable)
-
- // test overloading
- scalaTable = in1.join(func1('c, "$") as 's).select('c, 's)
- javaTable = in2.join("func1(c, '$') as (s)").select("c, s")
- verifyTableEquals(scalaTable, javaTable)
-
- // test custom result type
- val func2 = new TableFunc2
- javaTableEnv.registerFunction("func2", func2)
- scalaTable = in1.join(func2('c) as ('name, 'len)).select('c, 'name, 'len)
- javaTable = in2.join("func2(c).as(name, len)").select("c, name, len")
- verifyTableEquals(scalaTable, javaTable)
-
- // test hierarchy generic type
- val hierarchy = new HierarchyTableFunction
- javaTableEnv.registerFunction("hierarchy", hierarchy)
- scalaTable = in1.join(hierarchy('c) as ('name, 'adult, 'len))
- .select('c, 'name, 'len, 'adult)
- javaTable = in2.join("AS(hierarchy(c), name, adult, len)")
- .select("c, name, len, adult")
- verifyTableEquals(scalaTable, javaTable)
-
- // test pojo type
- val pojo = new PojoTableFunc
- javaTableEnv.registerFunction("pojo", pojo)
- scalaTable = in1.join(pojo('c))
- .select('c, 'name, 'age)
- javaTable = in2.join("pojo(c)")
- .select("c, name, age")
- verifyTableEquals(scalaTable, javaTable)
-
- // test with filter
- scalaTable = in1.join(func2('c) as ('name, 'len))
- .select('c, 'name, 'len).filter('len > 2)
- javaTable = in2.join("func2(c) as (name, len)")
- .select("c, name, len").filter("len > 2")
- verifyTableEquals(scalaTable, javaTable)
-
- // test with scalar function
- scalaTable = in1.join(func1('c.substring(2)) as 's)
- .select('a, 'c, 's)
- javaTable = in2.join("func1(substring(c, 2)) as (s)")
- .select("a, c, s")
- verifyTableEquals(scalaTable, javaTable)
-
- // check scala object is forbidden
- expectExceptionThrown(
- tableEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
- expectExceptionThrown(
- javaTableEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
- expectExceptionThrown(
- in1.join(ObjectTableFunction('a, 1)), "Scala object")
-
- }
-
- @Test
- def testInvalidTableFunction(): Unit = {
- // mock
- val util = streamTestUtil()
- val t = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- val tEnv = TableEnvironment.getTableEnvironment(mock(classOf[JavaExecutionEnv]))
-
- //=================== check scala object is forbidden =====================
- // Scala table environment register
- expectExceptionThrown(util.addFunction("udtf", ObjectTableFunction), "Scala object")
- // Java table environment register
- expectExceptionThrown(tEnv.registerFunction("udtf", ObjectTableFunction), "Scala object")
- // Scala Table API directly call
- expectExceptionThrown(t.join(ObjectTableFunction('a, 1)), "Scala object")
-
-
- //============ throw exception when table function is not registered =========
- // Java Table API call
- expectExceptionThrown(t.join("nonexist(a)"), "Undefined function: NONEXIST")
- // SQL API call
- expectExceptionThrown(
- util.tEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(nonexist(a))"),
- "No match found for function signature nonexist(<NUMERIC>)")
-
-
- //========= throw exception when the called function is a scalar function ====
- util.addFunction("func0", Func0)
- // Java Table API call
- expectExceptionThrown(
- t.join("func0(a)"),
- "only accept expressions that define table functions",
- classOf[TableException])
- // SQL API call
- // NOTE: it doesn't throw an exception but an AssertionError, maybe a Calcite bug
- expectExceptionThrown(
- util.tEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(func0(a))"),
- null,
- classOf[AssertionError])
-
- //========== throw exception when the parameters is not correct ===============
- // Java Table API call
- util.addFunction("func2", new TableFunc2)
- expectExceptionThrown(
- t.join("func2(c, c)"),
- "Given parameters of function 'FUNC2' do not match any signature")
- // SQL API call
- expectExceptionThrown(
- util.tEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(func2(c, c))"),
- "No match found for function signature func2(<CHARACTER>, <CHARACTER>)")
- }
-
- @Test
- def testCrossJoin(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- val function = util.addFunction("func1", new TableFunc1)
-
- val result1 = table.join(function('c) as 's).select('c, 's)
-
- val expected1 = unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamCorrelate",
- streamTableNode(0),
- term("invocation", s"$function($$2)"),
- term("function", function),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
- term("joinType", "INNER")
- ),
- term("select", "c", "s")
- )
-
- util.verifyTable(result1, expected1)
-
- // test overloading
-
- val result2 = table.join(function('c, "$") as 's).select('c, 's)
-
- val expected2 = unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamCorrelate",
- streamTableNode(0),
- term("invocation", s"$function($$2, '$$')"),
- term("function", function),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
- term("joinType", "INNER")
- ),
- term("select", "c", "s")
- )
-
- util.verifyTable(result2, expected2)
- }
-
- @Test
- def testLeftOuterJoin(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- val function = util.addFunction("func1", new TableFunc1)
-
- val result = table.leftOuterJoin(function('c) as 's).select('c, 's)
-
- val expected = unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamCorrelate",
- streamTableNode(0),
- term("invocation", s"$function($$2)"),
- term("function", function),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
- term("joinType", "LEFT")
- ),
- term("select", "c", "s")
- )
-
- util.verifyTable(result, expected)
- }
-
- @Test
- def testCustomType(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- val function = util.addFunction("func2", new TableFunc2)
-
- val result = table.join(function('c) as ('name, 'len)).select('c, 'name, 'len)
-
- val expected = unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamCorrelate",
- streamTableNode(0),
- term("invocation", s"$function($$2)"),
- term("function", function),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
- "VARCHAR(2147483647) name, INTEGER len)"),
- term("joinType", "INNER")
- ),
- term("select", "c", "name", "len")
- )
-
- util.verifyTable(result, expected)
- }
-
- @Test
- def testHierarchyType(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- val function = util.addFunction("hierarchy", new HierarchyTableFunction)
-
- val result = table.join(function('c) as ('name, 'adult, 'len))
-
- val expected = unaryNode(
- "DataStreamCorrelate",
- streamTableNode(0),
- term("invocation", s"$function($$2)"),
- term("function", function),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
- " VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)"),
- term("joinType", "INNER")
- )
-
- util.verifyTable(result, expected)
- }
-
- @Test
- def testPojoType(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- val function = util.addFunction("pojo", new PojoTableFunc)
-
- val result = table.join(function('c))
-
- val expected = unaryNode(
- "DataStreamCorrelate",
- streamTableNode(0),
- term("invocation", s"$function($$2)"),
- term("function", function),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
- "INTEGER age, VARCHAR(2147483647) name)"),
- term("joinType", "INNER")
- )
-
- util.verifyTable(result, expected)
- }
-
- @Test
- def testFilter(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- val function = util.addFunction("func2", new TableFunc2)
-
- val result = table
- .join(function('c) as ('name, 'len))
- .select('c, 'name, 'len)
- .filter('len > 2)
-
- val expected = unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamCorrelate",
- streamTableNode(0),
- term("invocation", s"$function($$2)"),
- term("function", function),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
- "VARCHAR(2147483647) name, INTEGER len)"),
- term("joinType", "INNER"),
- term("condition", ">($1, 2)")
- ),
- term("select", "c", "name", "len")
- )
-
- util.verifyTable(result, expected)
- }
-
- @Test
- def testScalarFunction(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- val function = util.addFunction("func1", new TableFunc1)
-
- val result = table.join(function('c.substring(2)) as 's)
-
- val expected = unaryNode(
- "DataStreamCorrelate",
- streamTableNode(0),
- term("invocation", s"$function(SUBSTRING($$2, 2, CHAR_LENGTH($$2)))"),
- term("function", function),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
- term("joinType", "INNER")
- )
-
- util.verifyTable(result, expected)
- }
-
- // ----------------------------------------------------------------------------------------------
-
- private def expectExceptionThrown(
- function: => Unit,
- keywords: String,
- clazz: Class[_ <: Throwable] = classOf[ValidationException])
- : Unit = {
- try {
- function
- fail(s"Expected a $clazz, but no exception is thrown.")
- } catch {
- case e if e.getClass == clazz =>
- if (keywords != null) {
- assertTrue(
- s"The exception message '${e.getMessage}' doesn't contain keyword '$keywords'",
- e.getMessage.contains(keywords))
- }
- case e: Throwable => fail(s"Expected throw ${clazz.getSimpleName}, but is $e.")
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/utils/StreamITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/utils/StreamITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/utils/StreamITCase.scala
deleted file mode 100644
index 4fd3cd7..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/utils/StreamITCase.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.stream.utils
-
-import java.util.Collections
-
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import scala.collection.mutable
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
-import scala.collection.JavaConverters._
-
-object StreamITCase {
-
- var testResults = mutable.MutableList.empty[String]
-
- def clear = {
- StreamITCase.testResults.clear()
- }
-
- def compareWithList(expected: java.util.List[String]): Unit = {
- Collections.sort(expected)
- assertEquals(expected.asScala, StreamITCase.testResults.sorted)
- }
-
- final class StringSink extends RichSinkFunction[Row]() {
- def invoke(value: Row) {
- testResults.synchronized {
- testResults += value.toString
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/utils/StreamTestData.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/utils/StreamTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/utils/StreamTestData.scala
deleted file mode 100644
index 321b8ac..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/utils/StreamTestData.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.stream.utils
-
-import org.apache.flink.api.scala._
-import scala.collection.mutable
-import org.apache.flink.streaming.api.scala.DataStream
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-
-object StreamTestData {
-
- def getSmall3TupleDataStream(env: StreamExecutionEnvironment): DataStream[(Int, Long, String)] = {
- val data = new mutable.MutableList[(Int, Long, String)]
- data.+=((1, 1L, "Hi"))
- data.+=((2, 2L, "Hello"))
- data.+=((3, 2L, "Hello world"))
- env.fromCollection(data)
- }
-
- def get3TupleDataStream(env: StreamExecutionEnvironment): DataStream[(Int, Long, String)] = {
- val data = new mutable.MutableList[(Int, Long, String)]
- data.+=((1, 1L, "Hi"))
- data.+=((2, 2L, "Hello"))
- data.+=((3, 2L, "Hello world"))
- data.+=((4, 3L, "Hello world, how are you?"))
- data.+=((5, 3L, "I am fine."))
- data.+=((6, 3L, "Luke Skywalker"))
- data.+=((7, 4L, "Comment#1"))
- data.+=((8, 4L, "Comment#2"))
- data.+=((9, 4L, "Comment#3"))
- data.+=((10, 4L, "Comment#4"))
- data.+=((11, 5L, "Comment#5"))
- data.+=((12, 5L, "Comment#6"))
- data.+=((13, 5L, "Comment#7"))
- data.+=((14, 5L, "Comment#8"))
- data.+=((15, 5L, "Comment#9"))
- data.+=((16, 6L, "Comment#10"))
- data.+=((17, 6L, "Comment#11"))
- data.+=((18, 6L, "Comment#12"))
- data.+=((19, 6L, "Comment#13"))
- data.+=((20, 6L, "Comment#14"))
- data.+=((21, 6L, "Comment#15"))
- env.fromCollection(data)
- }
-
- def get5TupleDataStream(env: StreamExecutionEnvironment):
- DataStream[(Int, Long, Int, String, Long)] = {
-
- val data = new mutable.MutableList[(Int, Long, Int, String, Long)]
- data.+=((1, 1L, 0, "Hallo", 1L))
- data.+=((2, 2L, 1, "Hallo Welt", 2L))
- data.+=((2, 3L, 2, "Hallo Welt wie", 1L))
- data.+=((3, 4L, 3, "Hallo Welt wie gehts?", 2L))
- data.+=((3, 5L, 4, "ABC", 2L))
- data.+=((3, 6L, 5, "BCD", 3L))
- data.+=((4, 7L, 6, "CDE", 2L))
- data.+=((4, 8L, 7, "DEF", 1L))
- data.+=((4, 9L, 8, "EFG", 1L))
- data.+=((4, 10L, 9, "FGH", 2L))
- data.+=((5, 11L, 10, "GHI", 1L))
- data.+=((5, 12L, 11, "HIJ", 3L))
- data.+=((5, 13L, 12, "IJK", 3L))
- data.+=((5, 14L, 13, "JKL", 2L))
- data.+=((5, 15L, 14, "KLM", 2L))
- env.fromCollection(data)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/AggregationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/AggregationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/AggregationTest.scala
deleted file mode 100644
index 6c9d2e8..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/AggregationTest.scala
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.utils.TableTestBase
-import org.apache.flink.api.table.utils.TableTestUtil._
-import org.junit.Test
-
-/**
- * Test for testing aggregate plans.
- */
-class AggregationTest extends TableTestBase {
-
- @Test
- def testAggregateQueryBatchSQL(): Unit = {
- val util = batchTestUtil()
- util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-
- val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable"
-
- val setValues = unaryNode(
- "DataSetValues",
- batchTableNode(0),
- tuples(List(null,null,null)),
- term("values","a","b","c")
- )
- val union = unaryNode(
- "DataSetUnion",
- setValues,
- term("union","a","b","c")
- )
-
- val aggregate = unaryNode(
- "DataSetAggregate",
- union,
- term("select",
- "AVG(a) AS EXPR$0",
- "SUM(b) AS EXPR$1",
- "COUNT(c) AS EXPR$2")
- )
- util.verifySql(sqlQuery, aggregate)
- }
-
- @Test
- def testAggregateWithFilterQueryBatchSQL(): Unit = {
- val util = batchTestUtil()
- util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-
- val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable WHERE a = 1"
-
- val calcNode = unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select", "a", "b", "c"),
- term("where", "=(a, 1)")
- )
-
- val setValues = unaryNode(
- "DataSetValues",
- calcNode,
- tuples(List(null,null,null)),
- term("values","a","b","c")
- )
-
- val union = unaryNode(
- "DataSetUnion",
- setValues,
- term("union","a","b","c")
- )
-
- val aggregate = unaryNode(
- "DataSetAggregate",
- union,
- term("select",
- "AVG(a) AS EXPR$0",
- "SUM(b) AS EXPR$1",
- "COUNT(c) AS EXPR$2")
- )
- util.verifySql(sqlQuery, aggregate)
- }
-
- @Test
- def testAggregateGroupQueryBatchSQL(): Unit = {
- val util = batchTestUtil()
- util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-
- val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable GROUP BY a"
-
- val aggregate = unaryNode(
- "DataSetAggregate",
- batchTableNode(0),
- term("groupBy", "a"),
- term("select",
- "a",
- "AVG(a) AS EXPR$0",
- "SUM(b) AS EXPR$1",
- "COUNT(c) AS EXPR$2")
- )
- val expected = unaryNode(
- "DataSetCalc",
- aggregate,
- term("select",
- "EXPR$0",
- "EXPR$1",
- "EXPR$2")
- )
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testAggregateGroupWithFilterQueryBatchSQL(): Unit = {
- val util = batchTestUtil()
- util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-
- val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable WHERE a = 1 GROUP BY a"
-
- val calcNode = unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select","a", "b", "c") ,
- term("where","=(a, 1)")
- )
-
- val aggregate = unaryNode(
- "DataSetAggregate",
- calcNode,
- term("groupBy", "a"),
- term("select",
- "a",
- "AVG(a) AS EXPR$0",
- "SUM(b) AS EXPR$1",
- "COUNT(c) AS EXPR$2")
- )
- val expected = unaryNode(
- "DataSetCalc",
- aggregate,
- term("select",
- "EXPR$0",
- "EXPR$1",
- "EXPR$2")
- )
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testAggregateGroupWithFilterTableApi(): Unit = {
-
- val util = batchTestUtil()
- val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-
- val resultTable = sourceTable.groupBy('a)
- .select('a, 'a.avg, 'b.sum, 'c.count)
- .where('a === 1)
-
- val calcNode = unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select", "a", "b", "c"),
- term("where", "=(a, 1)")
- )
-
- val expected = unaryNode(
- "DataSetAggregate",
- calcNode,
- term("groupBy", "a"),
- term("select",
- "a",
- "AVG(a) AS TMP_0",
- "SUM(b) AS TMP_1",
- "COUNT(c) AS TMP_2")
- )
-
- util.verifyTable(resultTable,expected)
- }
-
- @Test
- def testAggregateTableApi(): Unit = {
- val util = batchTestUtil()
- val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
- val resultTable = sourceTable.select('a.avg,'b.sum,'c.count)
-
- val setValues = unaryNode(
- "DataSetValues",
- batchTableNode(0),
- tuples(List(null,null,null)),
- term("values","a","b","c")
- )
- val union = unaryNode(
- "DataSetUnion",
- setValues,
- term("union","a","b","c")
- )
-
- val expected = unaryNode(
- "DataSetAggregate",
- union,
- term("select",
- "AVG(a) AS TMP_0",
- "SUM(b) AS TMP_1",
- "COUNT(c) AS TMP_2")
- )
- util.verifyTable(resultTable, expected)
- }
-
- @Test
- def testAggregateWithFilterTableApi(): Unit = {
- val util = batchTestUtil()
- val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-
- val resultTable = sourceTable.select('a,'b,'c).where('a === 1)
- .select('a.avg,'b.sum,'c.count)
-
- val calcNode = unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select", "a", "b", "c"),
- term("where", "=(a, 1)")
- )
-
- val setValues = unaryNode(
- "DataSetValues",
- calcNode,
- tuples(List(null,null,null)),
- term("values","a","b","c")
- )
-
- val union = unaryNode(
- "DataSetUnion",
- setValues,
- term("union","a","b","c")
- )
-
- val expected = unaryNode(
- "DataSetAggregate",
- union,
- term("select",
- "AVG(a) AS TMP_0",
- "SUM(b) AS TMP_1",
- "COUNT(c) AS TMP_2")
- )
-
- util.verifyTable(resultTable, expected)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CalciteConfigBuilderTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CalciteConfigBuilderTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CalciteConfigBuilderTest.scala
deleted file mode 100644
index 2b0d446..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CalciteConfigBuilderTest.scala
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table
-
-import org.apache.calcite.rel.rules.{CalcSplitRule, CalcMergeRule, FilterMergeRule}
-import org.apache.calcite.sql.fun.{SqlStdOperatorTable, OracleSqlOperatorTable}
-import org.apache.calcite.tools.RuleSets
-import org.junit.Test
-import org.junit.Assert._
-
-import scala.collection.JavaConverters._
-
-class CalciteConfigBuilderTest {
-
- @Test
- def testDefaultRules(): Unit = {
-
- val cc: CalciteConfig = new CalciteConfigBuilder()
- .build()
-
- assertEquals(false, cc.replacesRuleSet)
- assertFalse(cc.getRuleSet.isDefined)
- }
-
- @Test
- def testReplaceRules(): Unit = {
-
- val cc: CalciteConfig = new CalciteConfigBuilder()
- .replaceRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
- .build()
-
- assertEquals(true, cc.replacesRuleSet)
- assertTrue(cc.getRuleSet.isDefined)
- val cSet = cc.getRuleSet.get.iterator().asScala.toSet
- assertEquals(1, cSet.size)
- assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
- }
-
- @Test
- def testReplaceAddRules(): Unit = {
-
- val cc: CalciteConfig = new CalciteConfigBuilder()
- .replaceRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
- .addRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE))
- .build()
-
- assertEquals(true, cc.replacesRuleSet)
- assertTrue(cc.getRuleSet.isDefined)
- val cSet = cc.getRuleSet.get.iterator().asScala.toSet
- assertEquals(3, cSet.size)
- assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
- assertTrue(cSet.contains(CalcMergeRule.INSTANCE))
- assertTrue(cSet.contains(CalcSplitRule.INSTANCE))
- }
-
- @Test
- def testAddRules(): Unit = {
-
- val cc: CalciteConfig = new CalciteConfigBuilder()
- .addRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
- .build()
-
- assertEquals(false, cc.replacesRuleSet)
- assertTrue(cc.getRuleSet.isDefined)
- val cSet = cc.getRuleSet.get.iterator().asScala.toSet
- assertEquals(1, cSet.size)
- assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
- }
-
- @Test
- def testAddAddRules(): Unit = {
-
- val cc: CalciteConfig = new CalciteConfigBuilder()
- .addRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
- .addRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE))
- .build()
-
- assertEquals(false, cc.replacesRuleSet)
- assertTrue(cc.getRuleSet.isDefined)
- val cSet = cc.getRuleSet.get.iterator().asScala.toSet
- assertEquals(3, cSet.size)
- assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
- assertTrue(cSet.contains(CalcMergeRule.INSTANCE))
- assertTrue(cSet.contains(CalcSplitRule.INSTANCE))
- }
-
- @Test
- def testDefaultOperatorTable(): Unit = {
-
- val cc: CalciteConfig = new CalciteConfigBuilder()
- .build()
-
- assertEquals(false, cc.replacesSqlOperatorTable)
- assertFalse(cc.getSqlOperatorTable.isDefined)
- }
-
- def testReplaceOperatorTable(): Unit = {
-
- val oracleTable = new OracleSqlOperatorTable
-
- val cc: CalciteConfig = new CalciteConfigBuilder()
- .replaceSqlOperatorTable(oracleTable)
- .build()
-
- val oracleOps = oracleTable.getOperatorList.asScala
-
- assertEquals(true, cc.replacesSqlOperatorTable)
- assertTrue(cc.getSqlOperatorTable.isDefined)
- val ops = cc.getSqlOperatorTable.get.getOperatorList
- .asScala.toSet
- assertEquals(oracleOps.size, ops.size)
- for (o <- oracleOps) {
- assertTrue(ops.contains(o))
- }
- }
-
- def testReplaceAddOperatorTable(): Unit = {
-
- val oracleTable = new OracleSqlOperatorTable
- val stdTable = new SqlStdOperatorTable
-
- val cc: CalciteConfig = new CalciteConfigBuilder()
- .replaceSqlOperatorTable(oracleTable)
- .addSqlOperatorTable(stdTable)
- .build()
-
- val oracleOps = oracleTable.getOperatorList.asScala
- val stdOps = stdTable.getOperatorList.asScala
-
- assertEquals(true, cc.replacesSqlOperatorTable)
- assertTrue(cc.getSqlOperatorTable.isDefined)
- val ops = cc.getSqlOperatorTable.get.getOperatorList
- .asScala.toSet
- assertEquals(oracleOps.size + stdOps.size, ops.size)
- for (o <- oracleOps) {
- assertTrue(ops.contains(o))
- }
- for (o <- stdOps) {
- assertTrue(ops.contains(o))
- }
-
- }
-
- def testAddOperatorTable(): Unit = {
-
- val oracleTable = new OracleSqlOperatorTable
-
- val cc: CalciteConfig = new CalciteConfigBuilder()
- .addSqlOperatorTable(oracleTable)
- .build()
-
- val oracleOps = oracleTable.getOperatorList.asScala
-
- assertEquals(false, cc.replacesSqlOperatorTable)
- assertTrue(cc.getSqlOperatorTable.isDefined)
- val ops = cc.getSqlOperatorTable.get.getOperatorList
- .asScala.toSet
- assertEquals(oracleOps.size, ops.size)
- for (o <- oracleOps) {
- assertTrue(ops.contains(o))
- }
- }
-
- def testAddAddOperatorTable(): Unit = {
-
- val oracleTable = new OracleSqlOperatorTable
- val stdTable = new SqlStdOperatorTable
-
- val cc: CalciteConfig = new CalciteConfigBuilder()
- .addSqlOperatorTable(oracleTable)
- .addSqlOperatorTable(stdTable)
- .build()
-
- val oracleOps = oracleTable.getOperatorList.asScala
- val stdOps = stdTable.getOperatorList.asScala
-
- assertEquals(false, cc.replacesSqlOperatorTable)
- assertTrue(cc.getSqlOperatorTable.isDefined)
- val ops = cc.getSqlOperatorTable.get.getOperatorList
- .asScala.toSet
- assertEquals(oracleOps.size + stdOps.size, ops.size)
- for (o <- oracleOps) {
- assertTrue(ops.contains(o))
- }
- for (o <- stdOps) {
- assertTrue(ops.contains(o))
- }
-
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CompositeFlatteningTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CompositeFlatteningTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CompositeFlatteningTest.scala
deleted file mode 100644
index f14b9d8..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CompositeFlatteningTest.scala
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.createTypeInformation
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.CompositeFlatteningTest.{TestCaseClass, giveMeCaseClass}
-import org.apache.flink.api.table.functions.ScalarFunction
-import org.apache.flink.api.table.utils.TableTestBase
-import org.apache.flink.api.table.utils.TableTestUtil._
-import org.junit.Test
-
-
-class CompositeFlatteningTest extends TableTestBase {
-
- @Test(expected = classOf[ValidationException])
- def testDuplicateFlattening(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[((Int, Long), (String, Boolean), String)]("MyTable", 'a, 'b, 'c)
-
- table.select('a.flatten(), 'a.flatten())
- }
-
- @Test
- def testMultipleFlatteningsTable(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[((Int, Long), (String, Boolean), String)]("MyTable", 'a, 'b, 'c)
-
- val result = table.select('a.flatten(), 'c, 'b.flatten())
-
- val expected = unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select",
- "a._1 AS a$_1",
- "a._2 AS a$_2",
- "c",
- "b._1 AS b$_1",
- "b._2 AS b$_2"
- )
- )
-
- util.verifyTable(result, expected)
- }
-
- @Test
- def testMultipleFlatteningsSql(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[((Int, Long), (String, Boolean), String)]("MyTable", 'a, 'b, 'c)
-
- val expected = unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select",
- "a._1 AS _1",
- "a._2 AS _2",
- "c",
- "b._1 AS _10",
- "b._2 AS _20"
- )
- )
-
- util.verifySql(
- "SELECT MyTable.a.*, c, MyTable.b.* FROM MyTable",
- expected)
- }
-
- @Test
- def testNestedFlattenings(): Unit = {
- val util = batchTestUtil()
- val table = util
- .addTable[((((String, TestCaseClass), Boolean), String), String)]("MyTable", 'a, 'b)
-
- val result = table.select('a.flatten(), 'b.flatten())
-
- val expected = unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select",
- "a._1 AS a$_1",
- "a._2 AS a$_2",
- "b"
- )
- )
-
- util.verifyTable(result, expected)
- }
-
- @Test
- def testScalarFunctionAccess(): Unit = {
- val util = batchTestUtil()
- val table = util
- .addTable[(String, Int)]("MyTable", 'a, 'b)
-
- val result = table.select(
- giveMeCaseClass().get("my"),
- giveMeCaseClass().get("clazz"),
- giveMeCaseClass().flatten())
-
- val expected = unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select",
- "org.apache.flink.api.table.CompositeFlatteningTest.giveMeCaseClass$().my AS _c0",
- "org.apache.flink.api.table.CompositeFlatteningTest.giveMeCaseClass$().clazz AS _c1",
- "org.apache.flink.api.table.CompositeFlatteningTest.giveMeCaseClass$().my AS _c2",
- "org.apache.flink.api.table.CompositeFlatteningTest.giveMeCaseClass$().clazz AS _c3"
- )
- )
-
- util.verifyTable(result, expected)
- }
-
-}
-
-object CompositeFlatteningTest {
-
- case class TestCaseClass(my: String, clazz: Int)
-
- object giveMeCaseClass extends ScalarFunction {
- def eval(): TestCaseClass = {
- TestCaseClass("hello", 42)
- }
-
- override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
- createTypeInformation[TestCaseClass]
- }
- }
-}