You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/07/13 10:18:31 UTC
[22/44] flink git commit: [FLINK-6617] [table] Restructuring of tests
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/CalcITCase.scala
deleted file mode 100644
index 2e8f206..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/CalcITCase.scala
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.datastream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.stream.utils.StreamTestData
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.expressions.Literal
-import org.apache.flink.table.expressions.utils.{RichFunc1, RichFunc2}
-import org.apache.flink.table.runtime.datastream.StreamITCase
-import org.apache.flink.table.utils.UserDefinedFunctionTestUtils
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-class CalcITCase extends StreamingMultipleProgramsTestBase {
-
- @Test
- def testSimpleSelectAll(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1, '_2, '_3)
-
- val results = ds.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = mutable.MutableList(
- "1,1,Hi",
- "2,2,Hello",
- "3,2,Hello world")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testSelectFirst(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1)
-
- val results = ds.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = mutable.MutableList("1", "2", "3")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testSimpleSelectWithNaming(): Unit = {
-
- // verify ProjectMergeRule.
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv)
- .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
- .select('a, 'b)
-
- val results = ds.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = mutable.MutableList(
- "1,1", "2,2", "3,2", "4,3", "5,3", "6,3", "7,4",
- "8,4", "9,4", "10,4", "11,5", "12,5", "13,5", "14,5", "15,5",
- "16,6", "17,6", "18,6", "19,6", "20,6", "21,6")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testSimpleSelectAllWithAs(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
- .select('a, 'b, 'c)
-
- val results = ds.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = mutable.MutableList(
- "1,1,Hi",
- "2,2,Hello",
- "3,2,Hello world")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testSimpleFilter(): Unit = {
- /*
- * Test simple filter
- */
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
- val filterDs = ds.filter('a === 3)
- val results = filterDs.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = mutable.MutableList("3,2,Hello world")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testAllRejectingFilter(): Unit = {
- /*
- * Test all-rejecting filter
- */
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
- val filterDs = ds.filter( Literal(false) )
- val results = filterDs.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- assertEquals(true, StreamITCase.testResults.isEmpty)
- }
-
- @Test
- def testAllPassingFilter(): Unit = {
- /*
- * Test all-passing filter
- */
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
- val filterDs = ds.filter( Literal(true) )
- val results = filterDs.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = mutable.MutableList(
- "1,1,Hi",
- "2,2,Hello",
- "3,2,Hello world")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testFilterOnIntegerTupleField(): Unit = {
- /*
- * Test filter on Integer tuple field.
- */
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
- val filterDs = ds.filter( 'a % 2 === 0 )
- val results = filterDs.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = mutable.MutableList(
- "2,2,Hello", "4,3,Hello world, how are you?",
- "6,3,Luke Skywalker", "8,4,Comment#2", "10,4,Comment#4",
- "12,5,Comment#6", "14,5,Comment#8", "16,6,Comment#10",
- "18,6,Comment#12", "20,6,Comment#14")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testNotEquals(): Unit = {
- /*
- * Test filter on Integer tuple field.
- */
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
- val filterDs = ds.filter( 'a % 2 !== 0)
- val results = filterDs.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
- val expected = mutable.MutableList(
- "1,1,Hi", "3,2,Hello world",
- "5,3,I am fine.", "7,4,Comment#1", "9,4,Comment#3",
- "11,5,Comment#5", "13,5,Comment#7", "15,5,Comment#9",
- "17,6,Comment#11", "19,6,Comment#13", "21,6,Comment#15")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testUserDefinedFunctionWithParameter(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- tEnv.registerFunction("RichFunc2", new RichFunc2)
- UserDefinedFunctionTestUtils.setJobParameters(env, Map("string.value" -> "ABC"))
-
- StreamITCase.testResults = mutable.MutableList()
-
- val result = StreamTestData.get3TupleDataStream(env)
- .toTable(tEnv, 'a, 'b, 'c)
- .where("RichFunc2(c)='ABC#Hello'")
- .select('c)
-
- val results = result.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = mutable.MutableList("Hello")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testMultipleUserDefinedFunctions(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- tEnv.registerFunction("RichFunc1", new RichFunc1)
- tEnv.registerFunction("RichFunc2", new RichFunc2)
- UserDefinedFunctionTestUtils.setJobParameters(env, Map("string.value" -> "Abc"))
-
- StreamITCase.testResults = mutable.MutableList()
-
- val result = StreamTestData.get3TupleDataStream(env)
- .toTable(tEnv, 'a, 'b, 'c)
- .where("RichFunc2(c)='Abc#Hello' || RichFunc1(a)=3 && b=2")
- .select('c)
-
- val results = result.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = mutable.MutableList("Hello", "Hello world")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupAggregationsITCase.scala
deleted file mode 100644
index 8df14e8..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupAggregationsITCase.scala
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.datastream.table
-
-import org.apache.flink.api.common.time.Time
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.runtime.datastream.StreamITCase.RetractingSink
-import org.apache.flink.table.api.scala.stream.utils.StreamTestData
-import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
-import org.apache.flink.table.runtime.datastream.{StreamITCase, StreamingWithStateTestBase}
-import org.apache.flink.types.Row
-import org.junit.Assert.assertEquals
-import org.junit.Test
-
-import scala.collection.mutable
-
-/**
- * Tests of groupby (without window) aggregations
- */
-class GroupAggregationsITCase extends StreamingWithStateTestBase {
- private val queryConfig = new StreamQueryConfig()
- queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
-
-
- @Test
- def testDistinct(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
- .select('b).distinct()
-
- val results = t.toRetractStream[Row](queryConfig)
- results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
- env.execute()
-
- val expected = mutable.MutableList("1", "2", "3", "4", "5", "6")
- assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
- }
-
- @Test
- def testDistinctAfterAggregate(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
- .groupBy('e).select('e, 'a.count).distinct()
-
- val results = t.toRetractStream[Row](queryConfig)
- results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
- env.execute()
-
- val expected = mutable.MutableList("1,5", "2,7", "3,3")
- assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
- }
-
- @Test
- def testNonKeyedGroupAggregate(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
- .select('a.sum, 'b.sum)
-
- val results = t.toRetractStream[Row](queryConfig)
- results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
- env.execute()
-
- val expected = List("231,91")
- assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
- }
-
- @Test
- def testGroupAggregate(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
- .groupBy('b)
- .select('b, 'a.sum)
-
- val results = t.toRetractStream[Row](queryConfig)
- results.addSink(new StreamITCase.RetractingSink)
- env.execute()
-
- val expected = List("1,1", "2,5", "3,15", "4,34", "5,65", "6,111")
- assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
- }
-
- @Test
- def testDoubleGroupAggregation(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
- .groupBy('b)
- .select('a.count as 'cnt, 'b)
- .groupBy('cnt)
- .select('cnt, 'b.count as 'freq)
-
- val results = t.toRetractStream[Row](queryConfig)
-
- results.addSink(new RetractingSink)
- env.execute()
- val expected = List("1,1", "2,1", "3,1", "4,1", "5,1", "6,1")
- assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
- }
-
- @Test
- def testGroupAggregateWithExpression(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
- .groupBy('e, 'b % 3)
- .select('c.min, 'e, 'a.avg, 'd.count)
-
- val results = t.toRetractStream[Row](queryConfig)
- results.addSink(new RetractingSink)
- env.execute()
-
- val expected = mutable.MutableList(
- "0,1,1,1", "7,1,4,2", "2,1,3,2",
- "3,2,3,3", "1,2,3,3", "14,2,5,1",
- "12,3,5,1", "5,3,4,2")
- assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupWindowAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupWindowAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupWindowAggregationsITCase.scala
deleted file mode 100644
index 1af85d9..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupWindowAggregationsITCase.scala
+++ /dev/null
@@ -1,446 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.datastream.table
-
-import java.math.BigDecimal
-
-import org.apache.flink.api.common.time.Time
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMerge}
-import org.apache.flink.table.functions.aggfunctions.CountAggFunction
-import org.apache.flink.table.runtime.datastream.StreamITCase
-import org.apache.flink.table.runtime.datastream.table.GroupWindowAggregationsITCase._
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-/**
- * We only test some aggregations until better testing of constructed DataStream
- * programs is possible.
- */
-class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
- private val queryConfig = new StreamQueryConfig()
- queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
-
- val data = List(
- (1L, 1, "Hi"),
- (2L, 2, "Hello"),
- (4L, 2, "Hello"),
- (8L, 3, "Hello world"),
- (16L, 3, "Hello world"))
-
- val data2 = List(
- (1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"),
- (2L, 2, 2d, 2f, new BigDecimal("2"), "Hallo"),
- (3L, 2, 2d, 2f, new BigDecimal("2"), "Hello"),
- (4L, 5, 5d, 5f, new BigDecimal("5"), "Hello"),
- (7L, 3, 3d, 3f, new BigDecimal("3"), "Hello"),
- (8L, 3, 3d, 3f, new BigDecimal("3"), "Hello world"),
- (16L, 4, 4d, 4f, new BigDecimal("4"), "Hello world"))
-
- @Test
- def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setParallelism(1)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val stream = env.fromCollection(data)
- val table = stream.toTable(tEnv, 'long, 'int, 'string, 'proctime.proctime)
-
- val countFun = new CountAggFunction
- val weightAvgFun = new WeightedAvg
-
- val windowedTable = table
- .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
- .groupBy('w, 'string)
- .select('string, countFun('int), 'int.avg,
- weightAvgFun('long, 'int), weightAvgFun('int, 'int))
-
- val results = windowedTable.toAppendStream[Row](queryConfig)
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = Seq("Hello world,1,3,8,3", "Hello world,2,3,12,3", "Hello,1,2,2,2",
- "Hello,2,2,3,2", "Hi,1,1,1,1")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testEventTimeSessionGroupWindowOverTime(): Unit = {
- //To verify the "merge" functionality, we create this test with the following characteristics:
- // 1. set the Parallelism to 1, and have the test data out of order
- // 2. create a waterMark with 10ms offset to delay the window emission by 10ms
- val sessionWindowTestdata = List(
- (1L, 1, "Hello"),
- (2L, 2, "Hello"),
- (8L, 8, "Hello"),
- (9L, 9, "Hello World"),
- (4L, 4, "Hello"),
- (16L, 16, "Hello"))
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setParallelism(1)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val countFun = new CountAggFunction
- val weightAvgFun = new WeightedAvgWithMerge
-
- val stream = env
- .fromCollection(sessionWindowTestdata)
- .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset[(Long, Int, String)](10L))
- val table = stream.toTable(tEnv, 'long, 'int, 'string, 'rowtime.rowtime)
-
- val windowedTable = table
- .window(Session withGap 5.milli on 'rowtime as 'w)
- .groupBy('w, 'string)
- .select('string, countFun('int), 'int.avg,
- weightAvgFun('long, 'int), weightAvgFun('int, 'int))
-
- val results = windowedTable.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = Seq("Hello World,1,9,9,9", "Hello,1,16,16,16", "Hello,4,3,5,5")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setParallelism(1)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val stream = env.fromCollection(data)
- val table = stream.toTable(tEnv, 'long, 'int, 'string, 'proctime.proctime)
- val countFun = new CountAggFunction
- val weightAvgFun = new WeightedAvg
-
- val windowedTable = table
- .window(Tumble over 2.rows on 'proctime as 'w)
- .groupBy('w)
- .select(countFun('string), 'int.avg,
- weightAvgFun('long, 'int), weightAvgFun('int, 'int))
-
- val results = windowedTable.toAppendStream[Row](queryConfig)
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = Seq("2,1,1,1", "2,2,6,2")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testEventTimeTumblingWindow(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val stream = env
- .fromCollection(data)
- .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset[(Long, Int, String)](0L))
- val table = stream.toTable(tEnv, 'long, 'int, 'string, 'rowtime.rowtime)
- val countFun = new CountAggFunction
- val weightAvgFun = new WeightedAvg
-
- val windowedTable = table
- .window(Tumble over 5.milli on 'rowtime as 'w)
- .groupBy('w, 'string)
- .select('string, countFun('string), 'int.avg, weightAvgFun('long, 'int),
- weightAvgFun('int, 'int), 'int.min, 'int.max, 'int.sum, 'w.start, 'w.end)
-
- val results = windowedTable.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = Seq(
- "Hello world,1,3,8,3,3,3,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01",
- "Hello world,1,3,16,3,3,3,3,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02",
- "Hello,2,2,3,2,2,2,4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
- "Hi,1,1,1,1,1,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testGroupWindowWithoutKeyInProjection(): Unit = {
- val data = List(
- (1L, 1, "Hi", 1, 1),
- (2L, 2, "Hello", 2, 2),
- (4L, 2, "Hello", 2, 2),
- (8L, 3, "Hello world", 3, 3),
- (16L, 3, "Hello world", 3, 3))
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setParallelism(1)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val stream = env.fromCollection(data)
- val table = stream.toTable(tEnv, 'long, 'int, 'string, 'int2, 'int3, 'proctime.proctime)
-
- val weightAvgFun = new WeightedAvg
-
- val windowedTable = table
- .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
- .groupBy('w, 'int2, 'int3, 'string)
- .select(weightAvgFun('long, 'int))
-
- val results = windowedTable.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = Seq("12", "8", "2", "3", "1")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
-
-
- // ----------------------------------------------------------------------------------------------
- // Sliding windows
- // ----------------------------------------------------------------------------------------------
-
- @Test
- def testAllEventTimeSlidingGroupWindowOverTime(): Unit = {
- // please keep this test in sync with the DataSet variant
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val stream = env
- .fromCollection(data2)
- .assignTimestampsAndWatermarks(
- new TimestampAndWatermarkWithOffset[(Long, Int, Double, Float, BigDecimal, String)](0L))
- val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
-
- val windowedTable = table
- .window(Slide over 5.milli every 2.milli on 'long as 'w)
- .groupBy('w)
- .select('int.count, 'w.start, 'w.end)
-
- val results = windowedTable.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = Seq(
- "1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013",
- "1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017",
- "1,1970-01-01 00:00:00.014,1970-01-01 00:00:00.019",
- "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021",
- "2,1969-12-31 23:59:59.998,1970-01-01 00:00:00.003",
- "2,1970-01-01 00:00:00.006,1970-01-01 00:00:00.011",
- "3,1970-01-01 00:00:00.002,1970-01-01 00:00:00.007",
- "3,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009",
- "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testEventTimeSlidingGroupWindowOverTimeOverlappingFullPane(): Unit = {
- // please keep this test in sync with the DataSet variant
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val stream = env
- .fromCollection(data2)
- .assignTimestampsAndWatermarks(
- new TimestampAndWatermarkWithOffset[(Long, Int, Double, Float, BigDecimal, String)](0L))
- val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
-
- val windowedTable = table
- .window(Slide over 10.milli every 5.milli on 'long as 'w)
- .groupBy('w, 'string)
- .select('string, 'int.count, 'w.start, 'w.end)
-
- val results = windowedTable.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = Seq(
- "Hallo,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005",
- "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01",
- "Hello world,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01",
- "Hello world,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015",
- "Hello world,1,1970-01-01 00:00:00.01,1970-01-01 00:00:00.02",
- "Hello world,1,1970-01-01 00:00:00.015,1970-01-01 00:00:00.025",
- "Hello,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015",
- "Hello,2,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005",
- "Hello,3,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01",
- "Hi,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005",
- "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testEventTimeSlidingGroupWindowOverTimeOverlappingSplitPane(): Unit = {
- // please keep this test in sync with the DataSet variant
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val stream = env
- .fromCollection(data2)
- .assignTimestampsAndWatermarks(
- new TimestampAndWatermarkWithOffset[(Long, Int, Double, Float, BigDecimal, String)](0L))
- val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
-
- val windowedTable = table
- .window(Slide over 5.milli every 4.milli on 'long as 'w)
- .groupBy('w, 'string)
- .select('string, 'int.count, 'w.start, 'w.end)
-
- val results = windowedTable.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = Seq(
- "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
- "Hello world,1,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009",
- "Hello world,1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013",
- "Hello world,1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017",
- "Hello world,1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021",
- "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
- "Hello,2,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009",
- "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testEventTimeSlidingGroupWindowOverTimeNonOverlappingFullPane(): Unit = {
- // please keep this test in sync with the DataSet variant
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val stream = env
- .fromCollection(data2)
- .assignTimestampsAndWatermarks(
- new TimestampAndWatermarkWithOffset[(Long, Int, Double, Float, BigDecimal, String)](0L))
- val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
-
- val windowedTable = table
- .window(Slide over 5.milli every 10.milli on 'long as 'w)
- .groupBy('w, 'string)
- .select('string, 'int.count, 'w.start, 'w.end)
-
- val results = windowedTable.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = Seq(
- "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
- "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
- "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testEventTimeSlidingGroupWindowOverTimeNonOverlappingSplitPane(): Unit = {
- // please keep this test in sync with the DataSet variant
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val stream = env
- .fromCollection(data2)
- .assignTimestampsAndWatermarks(
- new TimestampAndWatermarkWithOffset[(Long, Int, Double, Float, BigDecimal, String)](0L))
- val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
-
- val windowedTable = table
- .window(Slide over 3.milli every 10.milli on 'long as 'w)
- .groupBy('w, 'string)
- .select('string, 'int.count, 'w.start, 'w.end)
-
- val results = windowedTable.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = Seq(
- "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003",
- "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testEventTimeGroupWindowWithoutExplicitTimeField(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val stream = env
- .fromCollection(data2)
- .assignTimestampsAndWatermarks(
- new TimestampAndWatermarkWithOffset[(Long, Int, Double, Float, BigDecimal, String)](0L))
- .map(t => (t._2, t._6))
- val table = stream.toTable(tEnv, 'int, 'string, 'rowtime.rowtime)
-
- val windowedTable = table
- .window(Slide over 3.milli every 10.milli on 'rowtime as 'w)
- .groupBy('w, 'string)
- .select('string, 'int.count, 'w.start, 'w.end)
-
- val results = windowedTable.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
- val expected = Seq(
- "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003",
- "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-}
-
-object GroupWindowAggregationsITCase {
-
- class TimestampAndWatermarkWithOffset[T <: Product](
- offset: Long) extends AssignerWithPunctuatedWatermarks[T] {
-
- override def checkAndGetNextWatermark(
- lastElement: T,
- extractedTimestamp: Long): Watermark = {
- new Watermark(extractedTimestamp - offset)
- }
-
- override def extractTimestamp(
- element: T,
- previousElementTimestamp: Long): Long = {
- element.productElement(0).asInstanceOf[Long]
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/OverWindowITCase.scala
deleted file mode 100644
index 531e26f..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/OverWindowITCase.scala
+++ /dev/null
@@ -1,347 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.datastream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvg
-import org.apache.flink.table.api.java.utils.UserDefinedScalarFunctions.JavaFunc0
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.functions.aggfunctions.CountAggFunction
-import org.apache.flink.table.runtime.datastream.{StreamITCase, StreamingWithStateTestBase}
-import org.apache.flink.table.runtime.datastream.table.OverWindowITCase._
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-class OverWindowITCase extends StreamingWithStateTestBase {
-
- @Test
- def testProcTimeUnBoundedPartitionedRowOver(): Unit = {
-
- val data = List(
- (1L, 1, "Hello"),
- (2L, 2, "Hello"),
- (3L, 3, "Hello"),
- (4L, 4, "Hello"),
- (5L, 5, "Hello"),
- (6L, 6, "Hello"),
- (7L, 7, "Hello World"),
- (8L, 8, "Hello World"),
- (20L, 20, "Hello World"))
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setParallelism(1)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
- StreamITCase.clear
- val stream = env.fromCollection(data)
- val table = stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
- val countFun = new CountAggFunction
- val weightAvgFun = new WeightedAvg
-
- val windowedTable = table
- .window(
- Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
- .select('c, countFun('b) over 'w as 'mycount, weightAvgFun('a, 'b) over 'w as 'wAvg)
- .select('c, 'mycount, 'wAvg)
-
- val results = windowedTable.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = Seq(
- "Hello World,1,7", "Hello World,2,7", "Hello World,3,14",
- "Hello,1,1", "Hello,2,1", "Hello,3,2", "Hello,4,3", "Hello,5,3", "Hello,6,4")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testRowTimeUnBoundedPartitionedRangeOver(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setStateBackend(getStateBackend)
- StreamITCase.testResults = mutable.MutableList()
- StreamITCase.clear
- env.setParallelism(1)
-
- val data = Seq(
- Left(14000005L, (1, 1L, "Hi")),
- Left(14000000L, (2, 1L, "Hello")),
- Left(14000002L, (1, 1L, "Hello")),
- Left(14000002L, (1, 2L, "Hello")),
- Left(14000002L, (1, 3L, "Hello world")),
- Left(14000003L, (2, 2L, "Hello world")),
- Left(14000003L, (2, 3L, "Hello world")),
- Right(14000020L),
- Left(14000021L, (1, 4L, "Hello world")),
- Left(14000022L, (1, 5L, "Hello world")),
- Left(14000022L, (1, 6L, "Hello world")),
- Left(14000022L, (1, 7L, "Hello world")),
- Left(14000023L, (2, 4L, "Hello world")),
- Left(14000023L, (2, 5L, "Hello world")),
- Right(14000030L)
- )
- val table = env
- .addSource(new RowTimeSourceFunction[(Int, Long, String)](data))
- .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
- val countFun = new CountAggFunction
- val weightAvgFun = new WeightedAvg
- val plusOne = new JavaFunc0
-
- val windowedTable = table
- .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE following
- CURRENT_RANGE as 'w)
- .select(
- 'a, 'b, 'c,
- 'b.sum over 'w,
- "SUM:".toExpr + ('b.sum over 'w),
- countFun('b) over 'w,
- (countFun('b) over 'w) + 1,
- plusOne(countFun('b) over 'w),
- array('b.avg over 'w, 'b.max over 'w),
- 'b.avg over 'w,
- 'b.max over 'w,
- 'b.min over 'w,
- ('b.min over 'w).abs(),
- weightAvgFun('b, 'a) over 'w)
-
- val result = windowedTable.toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = mutable.MutableList(
- "1,1,Hello,6,SUM:6,3,4,4,[2, 3],2,3,1,1,2",
- "1,2,Hello,6,SUM:6,3,4,4,[2, 3],2,3,1,1,2",
- "1,3,Hello world,6,SUM:6,3,4,4,[2, 3],2,3,1,1,2",
- "1,1,Hi,7,SUM:7,4,5,5,[1, 3],1,3,1,1,1",
- "2,1,Hello,1,SUM:1,1,2,2,[1, 1],1,1,1,1,1",
- "2,2,Hello world,6,SUM:6,3,4,4,[2, 3],2,3,1,1,2",
- "2,3,Hello world,6,SUM:6,3,4,4,[2, 3],2,3,1,1,2",
- "1,4,Hello world,11,SUM:11,5,6,6,[2, 4],2,4,1,1,2",
- "1,5,Hello world,29,SUM:29,8,9,9,[3, 7],3,7,1,1,3",
- "1,6,Hello world,29,SUM:29,8,9,9,[3, 7],3,7,1,1,3",
- "1,7,Hello world,29,SUM:29,8,9,9,[3, 7],3,7,1,1,3",
- "2,4,Hello world,15,SUM:15,5,6,6,[3, 5],3,5,1,1,3",
- "2,5,Hello world,15,SUM:15,5,6,6,[3, 5],3,5,1,1,3"
- )
-
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testProcTimeBoundedPartitionedRowsOver(): Unit = {
-
- val data = List(
- (1, 1L, 0, "Hallo", 1L),
- (2, 2L, 1, "Hallo Welt", 2L),
- (2, 3L, 2, "Hallo Welt wie", 1L),
- (3, 4L, 3, "Hallo Welt wie gehts?", 2L),
- (3, 5L, 4, "ABC", 2L),
- (3, 6L, 5, "BCD", 3L),
- (4, 7L, 6, "CDE", 2L),
- (4, 8L, 7, "DEF", 1L),
- (4, 9L, 8, "EFG", 1L),
- (4, 10L, 9, "FGH", 2L),
- (5, 11L, 10, "GHI", 1L),
- (5, 12L, 11, "HIJ", 3L),
- (5, 13L, 12, "IJK", 3L),
- (5, 14L, 13, "JKL", 2L),
- (5, 15L, 14, "KLM", 2L))
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setParallelism(1)
- StreamITCase.testResults = mutable.MutableList()
-
- val stream = env.fromCollection(data)
- val table = stream.toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
-
- val windowedTable = table
- .window(Over partitionBy 'a orderBy 'proctime preceding 4.rows following CURRENT_ROW as 'w)
- .select('a, 'c.sum over 'w, 'c.min over 'w)
- val result = windowedTable.toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = mutable.MutableList(
- "1,0,0",
- "2,1,1",
- "2,3,1",
- "3,3,3",
- "3,7,3",
- "3,12,3",
- "4,6,6",
- "4,13,6",
- "4,21,6",
- "4,30,6",
- "5,10,10",
- "5,21,10",
- "5,33,10",
- "5,46,10",
- "5,60,10")
-
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testRowTimeBoundedPartitionedRowOver(): Unit = {
- val data = Seq(
- Left((1L, (1L, 1, "Hello"))),
- Left((2L, (2L, 2, "Hello"))),
- Left((1L, (1L, 1, "Hello"))),
- Left((2L, (2L, 2, "Hello"))),
- Left((2L, (2L, 2, "Hello"))),
- Left((1L, (1L, 1, "Hello"))),
- Left((3L, (7L, 7, "Hello World"))),
- Left((1L, (7L, 7, "Hello World"))),
- Left((1L, (7L, 7, "Hello World"))),
- Right(2L),
- Left((3L, (3L, 3, "Hello"))),
- Left((4L, (4L, 4, "Hello"))),
- Left((5L, (5L, 5, "Hello"))),
- Left((6L, (6L, 6, "Hello"))),
- Left((20L, (20L, 20, "Hello World"))),
- Right(6L),
- Left((8L, (8L, 8, "Hello World"))),
- Left((7L, (7L, 7, "Hello World"))),
- Right(20L))
-
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setParallelism(1)
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val table = env.addSource[(Long, Int, String)](
- new RowTimeSourceFunction[(Long, Int, String)](data))
- .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
- val windowedTable = table
- .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w)
- .select('c, 'a, 'a.count over 'w, 'a.sum over 'w)
-
- val result = windowedTable.toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = mutable.MutableList(
- "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
- "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
- "Hello,3,3,7", "Hello,4,3,9", "Hello,5,3,12",
- "Hello,6,3,15",
- "Hello World,7,1,7", "Hello World,7,2,14", "Hello World,7,3,21",
- "Hello World,7,3,21", "Hello World,8,3,22", "Hello World,20,3,35")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testRowTimeBoundedPartitionedRangeOver(): Unit = {
- val data = Seq(
- Left((1500L, (1L, 15, "Hello"))),
- Left((1600L, (1L, 16, "Hello"))),
- Left((1000L, (1L, 1, "Hello"))),
- Left((2000L, (2L, 2, "Hello"))),
- Right(1000L),
- Left((2000L, (2L, 2, "Hello"))),
- Left((2000L, (2L, 3, "Hello"))),
- Left((3000L, (3L, 3, "Hello"))),
- Right(2000L),
- Left((4000L, (4L, 4, "Hello"))),
- Right(3000L),
- Left((5000L, (5L, 5, "Hello"))),
- Right(5000L),
- Left((6000L, (6L, 6, "Hello"))),
- Left((6500L, (6L, 65, "Hello"))),
- Right(7000L),
- Left((9000L, (6L, 9, "Hello"))),
- Left((9500L, (6L, 18, "Hello"))),
- Left((9000L, (6L, 9, "Hello"))),
- Right(10000L),
- Left((10000L, (7L, 7, "Hello World"))),
- Left((11000L, (7L, 17, "Hello World"))),
- Left((11000L, (7L, 77, "Hello World"))),
- Right(12000L),
- Left((14000L, (7L, 18, "Hello World"))),
- Right(14000L),
- Left((15000L, (8L, 8, "Hello World"))),
- Right(17000L),
- Left((20000L, (20L, 20, "Hello World"))),
- Right(19000L))
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val table = env.addSource[(Long, Int, String)](
- new RowTimeSourceFunction[(Long, Int, String)](data))
- .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
- val windowedTable = table
- .window(
- Over partitionBy 'c orderBy 'rowtime preceding 1.seconds following CURRENT_RANGE as 'w)
- .select('c, 'b, 'a.count over 'w, 'a.sum over 'w)
-
- val result = windowedTable.toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = mutable.MutableList(
- "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
- "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
- "Hello,3,4,9",
- "Hello,4,2,7",
- "Hello,5,2,9",
- "Hello,6,2,11", "Hello,65,2,12",
- "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
- "Hello World,7,1,7", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
- "Hello World,8,2,15",
- "Hello World,20,1,20")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-}
-
-object OverWindowITCase {
-
- class RowTimeSourceFunction[T](
- dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
- override def run(ctx: SourceContext[T]): Unit = {
- dataWithTimestampList.foreach {
- case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
- case Right(w) => ctx.emitWatermark(new Watermark(w))
- }
- }
-
- override def cancel(): Unit = ???
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/RetractionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/RetractionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/RetractionITCase.scala
deleted file mode 100644
index e8cfd75..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/RetractionITCase.scala
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.datastream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.runtime.datastream.{StreamITCase, StreamingWithStateTestBase}
-import org.apache.flink.table.utils.TableFunc0
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import org.junit.Test
-
-
-/**
- * tests for retraction
- */
-class RetractionITCase extends StreamingWithStateTestBase {
- // input data
- val data = List(
- ("Hello", 1),
- ("word", 1),
- ("Hello", 1),
- ("bark", 1),
- ("bark", 1),
- ("bark", 1),
- ("bark", 1),
- ("bark", 1),
- ("bark", 1),
- ("flink", 1)
- )
-
- // keyed groupby + keyed groupby
- @Test
- def testWordCount(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
- env.setStateBackend(getStateBackend)
-
- val stream = env.fromCollection(data)
- val table = stream.toTable(tEnv, 'word, 'num)
- val resultTable = table
- .groupBy('word)
- .select('num.sum as 'count)
- .groupBy('count)
- .select('count, 'count.count as 'frequency)
-
- val results = resultTable.toRetractStream[Row]
- results.addSink(new StreamITCase.RetractingSink)
- env.execute()
-
- val expected = Seq("1,2", "2,1", "6,1")
- assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
- }
-
- // keyed groupby + non-keyed groupby
- @Test
- def testGroupByAndNonKeyedGroupBy(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
- env.setStateBackend(getStateBackend)
-
- val stream = env.fromCollection(data)
- val table = stream.toTable(tEnv, 'word, 'num)
- val resultTable = table
- .groupBy('word)
- .select('word as 'word, 'num.sum as 'cnt)
- .select('cnt.sum)
-
- val results = resultTable.toRetractStream[Row]
-
- results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
- env.execute()
-
- val expected = Seq("10")
- assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
- }
-
- // non-keyed groupby + keyed groupby
- @Test
- def testNonKeyedGroupByAndGroupBy(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
- env.setStateBackend(getStateBackend)
-
- val stream = env.fromCollection(data)
- val table = stream.toTable(tEnv, 'word, 'num)
- val resultTable = table
- .select('num.sum as 'count)
- .groupBy('count)
- .select('count, 'count.count)
-
- val results = resultTable.toRetractStream[Row]
- results.addSink(new StreamITCase.RetractingSink)
- env.execute()
-
- val expected = Seq("10,1")
- assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
- }
-
- // test unique process, if the current output message of unbounded groupby equals the
- // previous message, unbounded groupby will ignore the current one.
- @Test
- def testUniqueProcess(): Unit = {
- // data input
- val data = List(
- (1, 1L),
- (2, 2L),
- (3, 3L),
- (3, 3L),
- (4, 1L),
- (4, 0L),
- (4, 0L),
- (4, 0L),
- (5, 1L),
- (6, 6L),
- (6, 6L),
- (6, 6L),
- (7, 8L)
- )
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
- env.setStateBackend(getStateBackend)
- env.setParallelism(1)
-
- val stream = env.fromCollection(data)
- val table = stream.toTable(tEnv, 'pk, 'value)
- val resultTable = table
- .groupBy('pk)
- .select('pk as 'pk, 'value.sum as 'sum)
- .groupBy('sum)
- .select('sum, 'pk.count as 'count)
-
- val results = resultTable.toRetractStream[Row]
- results.addSink(new StreamITCase.RetractMessagesSink)
- env.execute()
-
- val expected = Seq(
- "+1,1", "+2,1", "+3,1", "-3,1", "+6,1", "-1,1", "+1,2", "-1,2", "+1,3", "-6,1", "+6,2",
- "-6,2", "+6,1", "+12,1", "-12,1", "+18,1", "+8,1")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- // correlate should handle retraction messages correctly
- @Test
- def testCorrelate(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
- env.setStateBackend(getStateBackend)
-
- val func0 = new TableFunc0
-
- val stream = env.fromCollection(data)
- val table = stream.toTable(tEnv, 'word, 'num)
- val resultTable = table
- .groupBy('word)
- .select('word as 'word, 'num.sum as 'cnt)
- .leftOuterJoin(func0('word))
- .groupBy('cnt)
- .select('cnt, 'word.count as 'frequency)
-
- val results = resultTable.toRetractStream[Row]
- results.addSink(new StreamITCase.RetractingSink)
- env.execute()
-
- val expected = Seq("1,2", "2,1", "6,1")
- assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/TableSinksITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/TableSinksITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/TableSinksITCase.scala
deleted file mode 100644
index 8a1e398..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/TableSinksITCase.scala
+++ /dev/null
@@ -1,510 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.datastream.table
-
-import java.io.File
-import java.lang.{Boolean => JBool}
-
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.functions.sink.SinkFunction
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.stream.utils.StreamTestData
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.sinks._
-import org.apache.flink.test.util.TestBaseUtils
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-class TableSinksITCase extends StreamingMultipleProgramsTestBase {
-
- @Test
- def testStreamTableSink(): Unit = {
-
- val tmpFile = File.createTempFile("flink-table-sink-test", ".tmp")
- tmpFile.deleteOnExit()
- val path = tmpFile.toURI.toString
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setParallelism(4)
-
- val input = StreamTestData.get3TupleDataStream(env)
- .map(x => x).setParallelism(4) // increase DOP to 4
-
- val results = input.toTable(tEnv, 'a, 'b, 'c)
- .where('a < 5 || 'a > 17)
- .select('c, 'b)
- .writeToSink(new CsvTableSink(path))
-
- env.execute()
-
- val expected = Seq(
- "Hi,1", "Hello,2", "Hello world,2", "Hello world, how are you?,3",
- "Comment#12,6", "Comment#13,6", "Comment#14,6", "Comment#15,6").mkString("\n")
-
- TestBaseUtils.compareResultsByLinesInMemory(expected, path)
- }
-
- @Test
- def testAppendSinkOnAppendTable(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = StreamTestData.get3TupleDataStream(env)
- .assignAscendingTimestamps(_._1.toLong)
- .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
-
- t.window(Tumble over 5.millis on 'rowtime as 'w)
- .groupBy('w)
- .select('w.end, 'id.count, 'num.sum)
- .writeToSink(new TestAppendSink)
-
- env.execute()
-
- val result = RowCollector.getAndClearValues.map(_.f1.toString).sorted
- val expected = List(
- "1970-01-01 00:00:00.005,4,8",
- "1970-01-01 00:00:00.01,5,18",
- "1970-01-01 00:00:00.015,5,24",
- "1970-01-01 00:00:00.02,5,29",
- "1970-01-01 00:00:00.025,2,12")
- .sorted
- assertEquals(expected, result)
- }
-
- @Test
- def testRetractSinkOnUpdatingTable(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = StreamTestData.get3TupleDataStream(env)
- .assignAscendingTimestamps(_._1.toLong)
- .toTable(tEnv, 'id, 'num, 'text)
-
- t.select('id, 'num, 'text.charLength() as 'len)
- .groupBy('len)
- .select('len, 'id.count, 'num.sum)
- .writeToSink(new TestRetractSink)
-
- env.execute()
- val results = RowCollector.getAndClearValues
-
- val retracted = restractResults(results).sorted
- val expected = List(
- "2,1,1",
- "5,1,2",
- "11,1,2",
- "25,1,3",
- "10,7,39",
- "14,1,3",
- "9,9,41").sorted
- assertEquals(expected, retracted)
-
- }
-
- @Test
- def testRetractSinkOnAppendTable(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = StreamTestData.get3TupleDataStream(env)
- .assignAscendingTimestamps(_._1.toLong)
- .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
-
- t.window(Tumble over 5.millis on 'rowtime as 'w)
- .groupBy('w)
- .select('w.end, 'id.count, 'num.sum)
- .writeToSink(new TestRetractSink)
-
- env.execute()
- val results = RowCollector.getAndClearValues
-
- assertFalse(
- "Received retraction messages for append only table",
- results.exists(!_.f0))
-
- val retracted = restractResults(results).sorted
- val expected = List(
- "1970-01-01 00:00:00.005,4,8",
- "1970-01-01 00:00:00.01,5,18",
- "1970-01-01 00:00:00.015,5,24",
- "1970-01-01 00:00:00.02,5,29",
- "1970-01-01 00:00:00.025,2,12")
- .sorted
- assertEquals(expected, retracted)
-
- }
-
- @Test
- def testUpsertSinkOnUpdatingTableWithFullKey(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = StreamTestData.get3TupleDataStream(env)
- .assignAscendingTimestamps(_._1.toLong)
- .toTable(tEnv, 'id, 'num, 'text)
-
- t.select('id, 'num, 'text.charLength() as 'len, ('id > 0) as 'cTrue)
- .groupBy('len, 'cTrue)
- .select('len, 'id.count as 'cnt, 'cTrue)
- .groupBy('cnt, 'cTrue)
- .select('cnt, 'len.count, 'cTrue)
- .writeToSink(new TestUpsertSink(Array("cnt", "cTrue"), false))
-
- env.execute()
- val results = RowCollector.getAndClearValues
-
- assertTrue(
- "Results must include delete messages",
- results.exists(_.f0 == false)
- )
-
- val retracted = upsertResults(results, Array(0, 2)).sorted
- val expected = List(
- "1,5,true",
- "7,1,true",
- "9,1,true").sorted
- assertEquals(expected, retracted)
-
- }
-
-
-
- @Test
- def testUpsertSinkOnAppendingTableWithFullKey1(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = StreamTestData.get3TupleDataStream(env)
- .assignAscendingTimestamps(_._1.toLong)
- .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
-
- t.window(Tumble over 5.millis on 'rowtime as 'w)
- .groupBy('w, 'num)
- .select('num, 'w.end as 'wend, 'id.count)
- .writeToSink(new TestUpsertSink(Array("wend", "num"), true))
-
- env.execute()
- val results = RowCollector.getAndClearValues
-
- assertFalse(
- "Received retraction messages for append only table",
- results.exists(!_.f0))
-
- val retracted = upsertResults(results, Array(0, 1, 2)).sorted
- val expected = List(
- "1,1970-01-01 00:00:00.005,1",
- "2,1970-01-01 00:00:00.005,2",
- "3,1970-01-01 00:00:00.005,1",
- "3,1970-01-01 00:00:00.01,2",
- "4,1970-01-01 00:00:00.01,3",
- "4,1970-01-01 00:00:00.015,1",
- "5,1970-01-01 00:00:00.015,4",
- "5,1970-01-01 00:00:00.02,1",
- "6,1970-01-01 00:00:00.02,4",
- "6,1970-01-01 00:00:00.025,2").sorted
- assertEquals(expected, retracted)
- }
-
- @Test
- def testUpsertSinkOnAppendingTableWithFullKey2(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = StreamTestData.get3TupleDataStream(env)
- .assignAscendingTimestamps(_._1.toLong)
- .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
-
- t.window(Tumble over 5.millis on 'rowtime as 'w)
- .groupBy('w, 'num)
- .select('w.start as 'wstart, 'w.end as 'wend, 'num, 'id.count)
- .writeToSink(new TestUpsertSink(Array("wstart", "wend", "num"), true))
-
- env.execute()
- val results = RowCollector.getAndClearValues
-
- assertFalse(
- "Received retraction messages for append only table",
- results.exists(!_.f0))
-
- val retracted = upsertResults(results, Array(0, 1, 2)).sorted
- val expected = List(
- "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1,1",
- "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,2,2",
- "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,3,1",
- "1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,3,2",
- "1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,4,3",
- "1970-01-01 00:00:00.01,1970-01-01 00:00:00.015,4,1",
- "1970-01-01 00:00:00.01,1970-01-01 00:00:00.015,5,4",
- "1970-01-01 00:00:00.015,1970-01-01 00:00:00.02,5,1",
- "1970-01-01 00:00:00.015,1970-01-01 00:00:00.02,6,4",
- "1970-01-01 00:00:00.02,1970-01-01 00:00:00.025,6,2").sorted
- assertEquals(expected, retracted)
- }
-
- @Test
- def testUpsertSinkOnAppendingTableWithoutFullKey1(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = StreamTestData.get3TupleDataStream(env)
- .assignAscendingTimestamps(_._1.toLong)
- .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
-
- t.window(Tumble over 5.millis on 'rowtime as 'w)
- .groupBy('w, 'num)
- .select('w.end as 'wend, 'id.count as 'cnt)
- .writeToSink(new TestUpsertSink(null, true))
-
- env.execute()
- val results = RowCollector.getAndClearValues
-
- assertFalse(
- "Received retraction messages for append only table",
- results.exists(!_.f0))
-
- val retracted = results.map(_.f1.toString).sorted
- val expected = List(
- "1970-01-01 00:00:00.005,1",
- "1970-01-01 00:00:00.005,2",
- "1970-01-01 00:00:00.005,1",
- "1970-01-01 00:00:00.01,2",
- "1970-01-01 00:00:00.01,3",
- "1970-01-01 00:00:00.015,1",
- "1970-01-01 00:00:00.015,4",
- "1970-01-01 00:00:00.02,1",
- "1970-01-01 00:00:00.02,4",
- "1970-01-01 00:00:00.025,2").sorted
- assertEquals(expected, retracted)
- }
-
- @Test
- def testUpsertSinkOnAppendingTableWithoutFullKey2(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = StreamTestData.get3TupleDataStream(env)
- .assignAscendingTimestamps(_._1.toLong)
- .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
-
- t.window(Tumble over 5.millis on 'rowtime as 'w)
- .groupBy('w, 'num)
- .select('num, 'id.count as 'cnt)
- .writeToSink(new TestUpsertSink(null, true))
-
- env.execute()
- val results = RowCollector.getAndClearValues
-
- assertFalse(
- "Received retraction messages for append only table",
- results.exists(!_.f0))
-
- val retracted = results.map(_.f1.toString).sorted
- val expected = List(
- "1,1",
- "2,2",
- "3,1",
- "3,2",
- "4,3",
- "4,1",
- "5,4",
- "5,1",
- "6,4",
- "6,2").sorted
- assertEquals(expected, retracted)
- }
-
- /** Converts a list of retraction messages into a list of final results. */
- private def restractResults(results: List[JTuple2[JBool, Row]]): List[String] = {
-
- val retracted = results
- .foldLeft(Map[String, Int]()){ (m: Map[String, Int], v: JTuple2[JBool, Row]) =>
- val cnt = m.getOrElse(v.f1.toString, 0)
- if (v.f0) {
- m + (v.f1.toString -> (cnt + 1))
- } else {
- m + (v.f1.toString -> (cnt - 1))
- }
- }.filter{ case (_, c: Int) => c != 0 }
-
- assertFalse(
- "Received retracted rows which have not been accumulated.",
- retracted.exists{ case (_, c: Int) => c < 0})
-
- retracted.flatMap { case (r: String, c: Int) => (0 until c).map(_ => r) }.toList
- }
-
- /** Converts a list of upsert messages into a list of final results. */
- private def upsertResults(results: List[JTuple2[JBool, Row]], keys: Array[Int]): List[String] = {
-
- def getKeys(r: Row): List[String] =
- keys.foldLeft(List[String]())((k, i) => r.getField(i).toString :: k)
-
- val upserted = results.foldLeft(Map[String, String]()){ (o: Map[String, String], r) =>
- val key = getKeys(r.f1).mkString("")
- if (r.f0) {
- o + (key -> r.f1.toString)
- } else {
- o - key
- }
- }
-
- upserted.values.toList
- }
-
-}
-
-private[flink] class TestAppendSink extends AppendStreamTableSink[Row] {
-
- var fNames: Array[String] = _
- var fTypes: Array[TypeInformation[_]] = _
-
- override def emitDataStream(s: DataStream[Row]): Unit = {
- s.map(
- new MapFunction[Row, JTuple2[JBool, Row]] {
- override def map(value: Row): JTuple2[JBool, Row] = new JTuple2(true, value)
- })
- .addSink(new RowSink)
- }
-
- override def getOutputType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames)
-
- override def getFieldNames: Array[String] = fNames
-
- override def getFieldTypes: Array[TypeInformation[_]] = fTypes
-
- override def configure(
- fieldNames: Array[String],
- fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = {
- val copy = new TestAppendSink
- copy.fNames = fieldNames
- copy.fTypes = fieldTypes
- copy
- }
-}
-
-private[flink] class TestRetractSink extends RetractStreamTableSink[Row] {
-
- var fNames: Array[String] = _
- var fTypes: Array[TypeInformation[_]] = _
-
- override def emitDataStream(s: DataStream[JTuple2[JBool, Row]]): Unit = {
- s.addSink(new RowSink)
- }
-
- override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames)
-
- override def getFieldNames: Array[String] = fNames
-
- override def getFieldTypes: Array[TypeInformation[_]] = fTypes
-
- override def configure(
- fieldNames: Array[String],
- fieldTypes: Array[TypeInformation[_]]): TableSink[JTuple2[JBool, Row]] = {
- val copy = new TestRetractSink
- copy.fNames = fieldNames
- copy.fTypes = fieldTypes
- copy
- }
-
-}
-
-private[flink] class TestUpsertSink(
- expectedKeys: Array[String],
- expectedIsAppendOnly: Boolean)
- extends UpsertStreamTableSink[Row] {
-
- var fNames: Array[String] = _
- var fTypes: Array[TypeInformation[_]] = _
-
- override def setKeyFields(keys: Array[String]): Unit =
- if (keys != null) {
- assertEquals("Provided key fields do not match expected keys",
- expectedKeys.sorted.mkString(","),
- keys.sorted.mkString(","))
- } else {
- assertNull("Provided key fields should not be null.", expectedKeys)
- }
-
- override def setIsAppendOnly(isAppendOnly: JBool): Unit =
- assertEquals(
- "Provided isAppendOnly does not match expected isAppendOnly",
- expectedIsAppendOnly,
- isAppendOnly)
-
- override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames)
-
- override def emitDataStream(s: DataStream[JTuple2[JBool, Row]]): Unit = {
- s.addSink(new RowSink)
- }
-
- override def getFieldNames: Array[String] = fNames
-
- override def getFieldTypes: Array[TypeInformation[_]] = fTypes
-
- override def configure(
- fieldNames: Array[String],
- fieldTypes: Array[TypeInformation[_]]): TableSink[JTuple2[JBool, Row]] = {
- val copy = new TestUpsertSink(expectedKeys, expectedIsAppendOnly)
- copy.fNames = fieldNames
- copy.fTypes = fieldTypes
- copy
- }
-}
-
-class RowSink extends SinkFunction[JTuple2[JBool, Row]] {
- override def invoke(value: JTuple2[JBool, Row]): Unit = RowCollector.addValue(value)
-}
-
-object RowCollector {
- private val sink: mutable.ArrayBuffer[JTuple2[JBool, Row]] =
- new mutable.ArrayBuffer[JTuple2[JBool, Row]]()
-
- def addValue(value: JTuple2[JBool, Row]): Unit = {
- sink.synchronized {
- sink += value
- }
- }
-
- def getAndClearValues: List[JTuple2[JBool, Row]] = {
- val out = sink.toList
- sink.clear()
- out
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/TableSourceITCase.scala
deleted file mode 100644
index 07b748c..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/TableSourceITCase.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.datastream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.runtime.datastream.StreamITCase
-import org.apache.flink.table.utils.{CommonTestData, TestFilterableTableSource}
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-class TableSourceITCase extends StreamingMultipleProgramsTestBase {
-
- @Test
- def testCsvTableSource(): Unit = {
-
- val csvTable = CommonTestData.getCsvTableSource
- StreamITCase.testResults = mutable.MutableList()
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- tEnv.registerTableSource("csvTable", csvTable)
- tEnv.scan("csvTable")
- .where('id > 4)
- .select('last, 'score * 2)
- .toAppendStream[Row]
- .addSink(new StreamITCase.StringSink[Row])
-
- env.execute()
-
- val expected = mutable.MutableList(
- "Williams,69.0",
- "Miller,13.56",
- "Smith,180.2",
- "Williams,4.68")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testCsvTableSourceWithFilterable(): Unit = {
- StreamITCase.testResults = mutable.MutableList()
- val tableName = "MyTable"
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- tEnv.registerTableSource(tableName, new TestFilterableTableSource)
- tEnv.scan(tableName)
- .where("amount > 4 && price < 9")
- .select("id, name")
- .addSink(new StreamITCase.StringSink[Row])
-
- env.execute()
-
- val expected = mutable.MutableList(
- "5,Record_5", "6,Record_6", "7,Record_7", "8,Record_8")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-}