You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:40 UTC
[11/47] flink git commit: [FLINK-4704] [table] Refactor package
structure of flink-table.
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala
deleted file mode 100644
index 195027d..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala
+++ /dev/null
@@ -1,383 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.expressions.Literal
-import org.apache.flink.api.table.{TableEnvironment, TableException, ValidationException}
-import org.apache.flink.types.Row
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class JoinITCase(
- mode: TestExecutionMode,
- configMode: TableConfigMode)
- extends TableProgramsTestBase(mode, configMode) {
-
- @Test
- def testJoin(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
- val joinT = ds1.join(ds2).where('b === 'e).select('c, 'g)
-
- val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
- val results = joinT.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testJoinWithFilter(): Unit = {
-
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
-
- val joinT = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g)
-
- val expected = "Hi,Hallo\n"
- val results = joinT.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testJoinWithJoinFilter(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
- val joinT = ds1.join(ds2).where('b === 'e && 'a < 6 && 'h < 'b).select('c, 'g)
-
- val expected = "Hello world, how are you?,Hallo Welt wie\n" +
- "I am fine.,Hallo Welt wie\n"
- val results = joinT.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testJoinWithMultipleKeys(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
- val joinT = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g)
-
- val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
- "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
- val results = joinT.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test(expected = classOf[ValidationException])
- def testJoinNonExistingKey(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
- ds1.join(ds2)
- // must fail. Field 'foo does not exist
- .where('foo === 'e)
- .select('c, 'g)
- }
-
- @Test(expected = classOf[ValidationException])
- def testJoinWithNonMatchingKeyTypes(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
- ds1.join(ds2)
- // must fail. Field 'a is Int, and 'g is String
- .where('a === 'g)
- .select('c, 'g).collect()
- }
-
- @Test(expected = classOf[ValidationException])
- def testJoinWithAmbiguousFields(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'c)
-
- ds1.join(ds2)
- // must fail. Both inputs share the same field 'c
- .where('a === 'd)
- .select('c, 'g)
- }
-
- @Test(expected = classOf[TableException])
- def testNoEqualityJoinPredicate1(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
- ds1.join(ds2)
- // must fail. No equality join predicate
- .where('d === 'f)
- .select('c, 'g).collect()
- }
-
- @Test(expected = classOf[TableException])
- def testNoEqualityJoinPredicate2(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
- ds1.join(ds2)
- // must fail. No equality join predicate
- .where('a < 'd)
- .select('c, 'g).collect()
- }
-
- @Test
- def testJoinWithAggregation(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
- val joinT = ds1.join(ds2).where('a === 'd).select('g.count)
-
- val expected = "6"
- val results = joinT.toDataSet[Row] collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testJoinWithGroupedAggregation(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
- val joinT = ds1.join(ds2)
- .where('a === 'd)
- .groupBy('a, 'd)
- .select('b.sum, 'g.count)
-
- val expected = "6,3\n" + "4,2\n" + "1,1"
- val results = joinT.toDataSet[Row] collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testJoinPushThroughJoin(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
- val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'j, 'k, 'l)
-
- val joinT = ds1.join(ds2)
- .where(Literal(true))
- .join(ds3)
- .where('a === 'd && 'e === 'k)
- .select('a, 'f, 'l)
-
- val expected = "2,1,Hello\n" + "2,1,Hello world\n" + "1,0,Hi"
- val results = joinT.toDataSet[Row] collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testJoinWithDisjunctivePred(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
- val joinT = ds1.join(ds2).filter('a === 'd && ('b === 'e || 'b === 'e - 10)).select('c, 'g)
-
- val expected = "Hi,Hallo\n" +
- "Hello,Hallo Welt\n" +
- "I am fine.,IJK"
- val results = joinT.toDataSet[Row] collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testJoinWithExpressionPreds(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
- val joinT = ds1.join(ds2).filter('b === 'h + 1 && 'a - 1 === 'd + 2).select('c, 'g)
-
- val expected = "I am fine.,Hallo Welt\n" +
- "Luke Skywalker,Hallo Welt wie gehts?\n" +
- "Luke Skywalker,ABC\n" +
- "Comment#2,HIJ\n" +
- "Comment#2,IJK"
- val results = joinT.toDataSet[Row] collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test(expected = classOf[ValidationException])
- def testJoinTablesFromDifferentEnvs(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
- val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv2, 'd, 'e, 'f, 'g, 'h)
-
- // Must fail. Tables are bound to different TableEnvironments.
- ds1.join(ds2).where('b === 'e).select('c, 'g)
- }
-
- @Test
- def testLeftJoinWithMultipleKeys(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
- tEnv.getConfig.setNullCheck(true)
-
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
- val joinT = ds1.leftOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
-
- val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
- "Hello world,ABC\n" + "Hello world, how are you?,null\n" + "I am fine.,HIJ\n" +
- "I am fine.,IJK\n" + "Luke Skywalker,null\n" + "Comment#1,null\n" + "Comment#2,null\n" +
- "Comment#3,null\n" + "Comment#4,null\n" + "Comment#5,null\n" + "Comment#6,null\n" +
- "Comment#7,null\n" + "Comment#8,null\n" + "Comment#9,null\n" + "Comment#10,null\n" +
- "Comment#11,null\n" + "Comment#12,null\n" + "Comment#13,null\n" + "Comment#14,null\n" +
- "Comment#15,null\n"
- val results = joinT.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test(expected = classOf[ValidationException])
- def testNoJoinCondition(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
- tEnv.getConfig.setNullCheck(true)
-
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
- val joinT = ds2.leftOuterJoin(ds1, 'b === 'd && 'b < 3).select('c, 'g)
- }
-
- @Test(expected = classOf[ValidationException])
- def testNoEquiJoin(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
- tEnv.getConfig.setNullCheck(true)
-
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
- val joinT = ds2.leftOuterJoin(ds1, 'b < 'd).select('c, 'g)
- }
-
- @Test
- def testRightJoinWithMultipleKeys(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
- tEnv.getConfig.setNullCheck(true)
-
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
- val joinT = ds1.rightOuterJoin(ds2, "a = d && b = h").select('c, 'g)
-
- val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "null,Hallo Welt wie\n" +
- "Hello world,Hallo Welt wie gehts?\n" + "Hello world,ABC\n" + "null,BCD\n" + "null,CDE\n" +
- "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "I am fine.,HIJ\n" +
- "I am fine.,IJK\n" + "null,JKL\n" + "null,KLM\n"
- val results = joinT.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testRightJoinWithNotOnlyEquiJoin(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
- tEnv.getConfig.setNullCheck(true)
-
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
- val joinT = ds1.rightOuterJoin(ds2, "a = d && b < h").select('c, 'g)
-
- val expected = "Hello world,BCD\n"
- val results = joinT.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testFullOuterJoinWithMultipleKeys(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
- tEnv.getConfig.setNullCheck(true)
-
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
- val joinT = ds1.fullOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
-
- val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "null,Hallo Welt wie\n" +
- "Hello world,Hallo Welt wie gehts?\n" + "Hello world,ABC\n" + "null,BCD\n" + "null,CDE\n" +
- "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "I am fine.,HIJ\n" +
- "I am fine.,IJK\n" + "null,JKL\n" + "null,KLM\n" + "Luke Skywalker,null\n" +
- "Comment#1,null\n" + "Comment#2,null\n" + "Comment#3,null\n" + "Comment#4,null\n" +
- "Comment#5,null\n" + "Comment#6,null\n" + "Comment#7,null\n" + "Comment#8,null\n" +
- "Comment#9,null\n" + "Comment#10,null\n" + "Comment#11,null\n" + "Comment#12,null\n" +
- "Comment#13,null\n" + "Comment#14,null\n" + "Comment#15,null\n" +
- "Hello world, how are you?,null\n"
- val results = joinT.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala
deleted file mode 100644
index 0d32cb4..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{TableEnvironment, ValidationException}
-import org.apache.flink.types.Row
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.util.Random
-
-@RunWith(classOf[Parameterized])
-class SetOperatorsITCase(
- mode: TestExecutionMode,
- configMode: TableConfigMode)
- extends TableProgramsTestBase(mode, configMode) {
-
- @Test
- def testUnionAll(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f)
-
- val unionDs = ds1.unionAll(ds2).select('c)
-
- val results = unionDs.toDataSet[Row].collect()
- val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n"
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testUnion(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f)
-
- val unionDs = ds1.union(ds2).select('c)
-
- val results = unionDs.toDataSet[Row].collect()
- val expected = "Hi\n" + "Hello\n" + "Hello world\n"
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testTernaryUnionAll(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
- val unionDs = ds1.unionAll(ds2).unionAll(ds3).select('c)
-
- val results = unionDs.toDataSet[Row].collect()
- val expected = "Hi\n" + "Hello\n" + "Hello world\n" +
- "Hi\n" + "Hello\n" + "Hello world\n" +
- "Hi\n" + "Hello\n" + "Hello world\n"
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testTernaryUnion(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
- val unionDs = ds1.union(ds2).union(ds3).select('c)
-
- val results = unionDs.toDataSet[Row].collect()
- val expected = "Hi\n" + "Hello\n" + "Hello world\n"
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test(expected = classOf[ValidationException])
- def testUnionDifferentColumnSize(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
-
- // must fail. Union inputs have different column size.
- ds1.unionAll(ds2)
- }
-
- @Test(expected = classOf[ValidationException])
- def testUnionDifferentFieldTypes(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
- .select('a, 'b, 'c)
-
- // must fail. Union inputs have different field types.
- ds1.unionAll(ds2)
- }
-
- @Test(expected = classOf[ValidationException])
- def testUnionTablesFromDifferentEnvs(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
- val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c)
-
- // Must fail. Tables are bound to different TableEnvironments.
- ds1.unionAll(ds2).select('c)
- }
-
- @Test
- def testMinusAll(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = env.fromElements((1, 1L, "Hi")).toTable(tEnv, 'a, 'b, 'c)
-
- val minusDs = ds1.unionAll(ds1).unionAll(ds1)
- .minusAll(ds2.unionAll(ds2)).select('c)
-
- val results = minusDs.toDataSet[Row].collect()
- val expected = "Hi\n" +
- "Hello\n" + "Hello world\n" +
- "Hello\n" + "Hello world\n" +
- "Hello\n" + "Hello world\n"
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testMinus(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = env.fromElements((1, 1L, "Hi")).toTable(tEnv, 'a, 'b, 'c)
-
- val minusDs = ds1.unionAll(ds1).unionAll(ds1)
- .minus(ds2.unionAll(ds2)).select('c)
-
- val results = minusDs.toDataSet[Row].collect()
- val expected = "Hello\n" + "Hello world\n"
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test(expected = classOf[ValidationException])
- def testMinusDifferentFieldTypes(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
- .select('a, 'b, 'c)
-
- // must fail. Minus inputs have different field types.
- ds1.minus(ds2)
- }
-
- @Test
- def testMinusDifferentFieldNames(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = env.fromElements((1, 1L, "Hi")).toTable(tEnv, 'd, 'e, 'f)
-
- val minusDs = ds1.unionAll(ds1).unionAll(ds1)
- .minus(ds2.unionAll(ds2)).select('c)
-
- val results = minusDs.toDataSet[Row].collect()
- val expected = "Hello\n" + "Hello world\n"
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test(expected = classOf[ValidationException])
- def testMinusAllTablesFromDifferentEnvs(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
- val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c)
-
- // Must fail. Tables are bound to different TableEnvironments.
- ds1.minusAll(ds2).select('c)
- }
-
- @Test
- def testIntersect(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val data = new mutable.MutableList[(Int, Long, String)]
- data.+=((1, 1L, "Hi"))
- data.+=((2, 2L, "Hello"))
- data.+=((2, 2L, "Hello"))
- data.+=((3, 2L, "Hello world!"))
- val ds2 = env.fromCollection(Random.shuffle(data)).toTable(tEnv, 'a, 'b, 'c)
-
- val intersectDS = ds1.intersect(ds2).select('c).toDataSet[Row]
-
- val results = intersectDS.collect()
-
- val expected = "Hi\n" + "Hello\n"
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testIntersectAll(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val data1 = new mutable.MutableList[Int]
- data1 += (1, 1, 1, 2, 2)
- val data2 = new mutable.MutableList[Int]
- data2 += (1, 2, 2, 2, 3)
- val ds1 = env.fromCollection(data1).toTable(tEnv, 'c)
- val ds2 = env.fromCollection(data2).toTable(tEnv, 'c)
-
- val intersectDS = ds1.intersectAll(ds2).select('c).toDataSet[Row]
-
- val expected = "1\n2\n2"
- val results = intersectDS.collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testIntersectWithDifferentFieldNames(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'e, 'f, 'g)
-
- val intersectDs = ds1.intersect(ds2).select('c)
-
- val results = intersectDs.toDataSet[Row].collect()
- val expected = "Hi\n" + "Hello\n" + "Hello world\n"
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test(expected = classOf[ValidationException])
- def testIntersectWithDifferentFieldTypes(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
- .select('a, 'b, 'c)
-
- // must fail. Intersect inputs have different field types.
- ds1.intersect(ds2)
- }
-
- @Test(expected = classOf[ValidationException])
- def testIntersectTablesFromDifferentEnvs(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
- val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c)
-
- // Must fail. Tables are bound to different TableEnvironments.
- ds1.intersect(ds2).select('c)
- }
-
- @Test
- def testIntersectWithScalarExpression(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- .select('a + 1, 'b, 'c)
- val ds2 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- .select('a + 1, 'b, 'c)
-
- val intersectDs = ds1.intersect(ds2)
-
- val results = intersectDs.toDataSet[Row].collect()
- val expected = "2,1,Hi\n" + "3,2,Hello\n" + "4,2,Hello world\n"
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
deleted file mode 100644
index b3cc054..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.table
-
-import org.apache.flink.api.scala.batch.utils.SortTestUtils._
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.scala.{ExecutionEnvironment, _}
-import org.apache.flink.api.table.{TableEnvironment, ValidationException}
-import org.apache.flink.types.Row
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class SortITCase(
- mode: TestExecutionMode,
- configMode: TableConfigMode)
- extends TableProgramsTestBase(mode, configMode) {
-
- def getExecutionEnvironment = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- env.setParallelism(4)
- env
- }
-
- @Test
- def testOrderByDesc(): Unit = {
- val env = getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds = CollectionDataSets.get3TupleDataSet(env)
- val t = ds.toTable(tEnv).orderBy('_1.desc)
- implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
- - x.productElement(0).asInstanceOf[Int] )
-
- val expected = sortExpectedly(tupleDataSetStrings)
- val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
-
- val result = results
- .filterNot(_.isEmpty)
- .sortBy(_.head)(Ordering.by(f=> f.toString))
- .reduceLeft(_ ++ _)
-
- TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
- }
-
- @Test
- def testOrderByAsc(): Unit = {
- val env = getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds = CollectionDataSets.get3TupleDataSet(env)
- val t = ds.toTable(tEnv).orderBy('_1.asc)
- implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
- x.productElement(0).asInstanceOf[Int] )
-
- val expected = sortExpectedly(tupleDataSetStrings)
- val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
-
- val result = results
- .filterNot(_.isEmpty)
- .sortBy(_.head)(Ordering.by(f=> f.toString))
- .reduceLeft(_ ++ _)
-
- TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
- }
-
- @Test
- def testOrderByMultipleFieldsDifferentDirections(): Unit = {
- val env = getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds = CollectionDataSets.get3TupleDataSet(env)
- val t = ds.toTable(tEnv).orderBy('_2.asc, '_1.desc)
- implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
- (x.productElement(1).asInstanceOf[Long], - x.productElement(0).asInstanceOf[Int]) )
-
- val expected = sortExpectedly(tupleDataSetStrings)
- val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
-
- val result = results
- .filterNot(_.isEmpty)
- .sortBy(_.head)(Ordering.by(f=> f.toString))
- .reduceLeft(_ ++ _)
-
- TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
- }
-
- @Test
- def testOrderByOffset(): Unit = {
- val env = getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds = CollectionDataSets.get3TupleDataSet(env)
- val t = ds.toTable(tEnv).orderBy('_1.asc).limit(3)
- implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
- x.productElement(0).asInstanceOf[Int] )
-
- val expected = sortExpectedly(tupleDataSetStrings, 3, 21)
- val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
-
- val result = results
- .filterNot(_.isEmpty)
- .sortBy(_.head)(Ordering.by(f=> f.toString))
- .reduceLeft(_ ++ _)
-
- TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
- }
-
- @Test
- def testOrderByOffsetAndFetch(): Unit = {
- val env = getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds = CollectionDataSets.get3TupleDataSet(env)
- val t = ds.toTable(tEnv).orderBy('_1.desc).limit(3, 5)
- implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
- - x.productElement(0).asInstanceOf[Int] )
-
- val expected = sortExpectedly(tupleDataSetStrings, 3, 8)
- val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
-
- val result = results
- .filterNot(_.isEmpty)
- .sortBy(_.head)(Ordering.by(f=> f.toString))
- .reduceLeft(_ ++ _)
-
- TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
- }
-
- @Test
- def testOrderByFetch(): Unit = {
- val env = getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds = CollectionDataSets.get3TupleDataSet(env)
- val t = ds.toTable(tEnv).orderBy('_1.asc).limit(0, 5)
- implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
- x.productElement(0).asInstanceOf[Int] )
-
- val expected = sortExpectedly(tupleDataSetStrings, 0, 5)
- val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
-
- val result = results
- .filterNot(_.isEmpty)
- .sortBy(_.head)(Ordering.by(f=> f.toString))
- .reduceLeft(_ ++ _)
-
- TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
- }
-
- @Test(expected = classOf[ValidationException])
- def testFetchWithoutOrder(): Unit = {
- val env = getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds = CollectionDataSets.get3TupleDataSet(env)
- val t = ds.toTable(tEnv).limit(0, 5)
-
- t.toDataSet[Row].collect()
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala
deleted file mode 100644
index 285a181..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.batch.table
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.{DataSet => JDataSet, ExecutionEnvironment => JavaExecutionEnv}
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment => ScalaExecutionEnv, _}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.table.utils.TableTestUtil._
-import org.apache.flink.api.table.utils.{PojoTableFunc, TableFunc2, _}
-import org.apache.flink.api.table.{TableEnvironment, Types}
-import org.apache.flink.types.Row
-import org.junit.Test
-import org.mockito.Mockito._
-
-class UserDefinedTableFunctionTest extends TableTestBase {
-
- @Test
- def testJavaScalaTableAPIEquality(): Unit = {
- // mock
- val ds = mock(classOf[DataSet[Row]])
- val jDs = mock(classOf[JDataSet[Row]])
- val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*)
- when(ds.javaSet).thenReturn(jDs)
- when(jDs.getType).thenReturn(typeInfo)
-
- // Scala environment
- val env = mock(classOf[ScalaExecutionEnv])
- val tableEnv = TableEnvironment.getTableEnvironment(env)
- val in1 = ds.toTable(tableEnv).as('a, 'b, 'c)
-
- // Java environment
- val javaEnv = mock(classOf[JavaExecutionEnv])
- val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv)
- val in2 = javaTableEnv.fromDataSet(jDs).as("a, b, c")
- javaTableEnv.registerTable("MyTable", in2)
-
- // test cross join
- val func1 = new TableFunc1
- javaTableEnv.registerFunction("func1", func1)
- var scalaTable = in1.join(func1('c) as 's).select('c, 's)
- var javaTable = in2.join("func1(c).as(s)").select("c, s")
- verifyTableEquals(scalaTable, javaTable)
-
- // test left outer join
- scalaTable = in1.leftOuterJoin(func1('c) as 's).select('c, 's)
- javaTable = in2.leftOuterJoin("as(func1(c), s)").select("c, s")
- verifyTableEquals(scalaTable, javaTable)
-
- // test overloading
- scalaTable = in1.join(func1('c, "$") as 's).select('c, 's)
- javaTable = in2.join("func1(c, '$') as (s)").select("c, s")
- verifyTableEquals(scalaTable, javaTable)
-
- // test custom result type
- val func2 = new TableFunc2
- javaTableEnv.registerFunction("func2", func2)
- scalaTable = in1.join(func2('c) as ('name, 'len)).select('c, 'name, 'len)
- javaTable = in2.join("func2(c).as(name, len)").select("c, name, len")
- verifyTableEquals(scalaTable, javaTable)
-
- // test hierarchy generic type
- val hierarchy = new HierarchyTableFunction
- javaTableEnv.registerFunction("hierarchy", hierarchy)
- scalaTable = in1.join(hierarchy('c) as ('name, 'adult, 'len))
- .select('c, 'name, 'len, 'adult)
- javaTable = in2.join("AS(hierarchy(c), name, adult, len)")
- .select("c, name, len, adult")
- verifyTableEquals(scalaTable, javaTable)
-
- // test pojo type
- val pojo = new PojoTableFunc
- javaTableEnv.registerFunction("pojo", pojo)
- scalaTable = in1.join(pojo('c))
- .select('c, 'name, 'age)
- javaTable = in2.join("pojo(c)")
- .select("c, name, age")
- verifyTableEquals(scalaTable, javaTable)
-
- // test with filter
- scalaTable = in1.join(func2('c) as ('name, 'len))
- .select('c, 'name, 'len).filter('len > 2)
- javaTable = in2.join("func2(c) as (name, len)")
- .select("c, name, len").filter("len > 2")
- verifyTableEquals(scalaTable, javaTable)
-
- // test with scalar function
- scalaTable = in1.join(func1('c.substring(2)) as 's)
- .select('a, 'c, 's)
- javaTable = in2.join("func1(substring(c, 2)) as (s)")
- .select("a, c, s")
- verifyTableEquals(scalaTable, javaTable)
- }
-
- @Test
- def testCrossJoin(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- val function = util.addFunction("func1", new TableFunc1)
-
- val result1 = table.join(function('c) as 's).select('c, 's)
-
- val expected1 = unaryNode(
- "DataSetCalc",
- unaryNode(
- "DataSetCorrelate",
- batchTableNode(0),
- term("invocation", s"$function($$2)"),
- term("function", function),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
- term("joinType", "INNER")
- ),
- term("select", "c", "s")
- )
-
- util.verifyTable(result1, expected1)
-
- // test overloading
-
- val result2 = table.join(function('c, "$") as 's).select('c, 's)
-
- val expected2 = unaryNode(
- "DataSetCalc",
- unaryNode(
- "DataSetCorrelate",
- batchTableNode(0),
- term("invocation", s"$function($$2, '$$')"),
- term("function", function),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
- term("joinType", "INNER")
- ),
- term("select", "c", "s")
- )
-
- util.verifyTable(result2, expected2)
- }
-
- @Test
- def testLeftOuterJoin(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- val function = util.addFunction("func1", new TableFunc1)
-
- val result = table.leftOuterJoin(function('c) as 's).select('c, 's)
-
- val expected = unaryNode(
- "DataSetCalc",
- unaryNode(
- "DataSetCorrelate",
- batchTableNode(0),
- term("invocation", s"$function($$2)"),
- term("function", function),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
- term("joinType", "LEFT")
- ),
- term("select", "c", "s")
- )
-
- util.verifyTable(result, expected)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/SortTestUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/SortTestUtils.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/SortTestUtils.scala
deleted file mode 100644
index 8d1f653..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/SortTestUtils.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.utils
-
-object SortTestUtils {
-
- val tupleDataSetStrings = List((1, 1L, "Hi")
- ,(2, 2L, "Hello")
- ,(3, 2L, "Hello world")
- ,(4, 3L, "Hello world, how are you?")
- ,(5, 3L, "I am fine.")
- ,(6, 3L, "Luke Skywalker")
- ,(7, 4L, "Comment#1")
- ,(8, 4L, "Comment#2")
- ,(9, 4L, "Comment#3")
- ,(10, 4L, "Comment#4")
- ,(11, 5L, "Comment#5")
- ,(12, 5L, "Comment#6")
- ,(13, 5L, "Comment#7")
- ,(14, 5L, "Comment#8")
- ,(15, 5L, "Comment#9")
- ,(16, 6L, "Comment#10")
- ,(17, 6L, "Comment#11")
- ,(18, 6L, "Comment#12")
- ,(19, 6L, "Comment#13")
- ,(20, 6L, "Comment#14")
- ,(21, 6L, "Comment#15"))
-
- def sortExpectedly(dataSet: List[Product])
- (implicit ordering: Ordering[Product]): String =
- sortExpectedly(dataSet, 0, dataSet.length)
-
- def sortExpectedly(dataSet: List[Product], start: Int, end: Int)
- (implicit ordering: Ordering[Product]): String = {
- dataSet
- .sorted(ordering)
- .slice(start, end)
- .mkString("\n")
- .replaceAll("[\\(\\)]", "")
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/TableProgramsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/TableProgramsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/TableProgramsTestBase.scala
deleted file mode 100644
index 2ce42d4..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/TableProgramsTestBase.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.utils
-
-import java.util
-
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.{EFFICIENT, NO_NULL, TableConfigMode}
-import org.apache.flink.api.table.TableConfig
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConversions._
-
-class TableProgramsTestBase(
- mode: TestExecutionMode,
- tableConfigMode: TableConfigMode)
- extends MultipleProgramsTestBase(mode) {
-
- def config: TableConfig = {
- val conf = new TableConfig
- tableConfigMode match {
- case NO_NULL =>
- conf.setNullCheck(false)
- case EFFICIENT =>
- conf.setEfficientTypeUsage(true)
- case _ => // keep default
- }
- conf
- }
-}
-
-object TableProgramsTestBase {
- case class TableConfigMode(nullCheck: Boolean, efficientTypes: Boolean)
-
- val DEFAULT = TableConfigMode(nullCheck = true, efficientTypes = false)
- val NO_NULL = TableConfigMode(nullCheck = false, efficientTypes = false)
- val EFFICIENT = TableConfigMode(nullCheck = false, efficientTypes = true)
-
- @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
- def parameters(): util.Collection[Array[java.lang.Object]] = {
- Seq[Array[AnyRef]](Array(TestExecutionMode.COLLECTION, DEFAULT))
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala
deleted file mode 100644
index 5eebb34..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.stream
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.TableEnvironment
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Assert.assertEquals
-import org.junit._
-
-class ExplainStreamTest
- extends StreamingMultipleProgramsTestBase {
-
- val testFilePath = ExplainStreamTest.this.getClass.getResource("/").getFile
-
- @Test
- def testFilter(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val table = env.fromElements((1, "hello"))
- .toTable(tEnv, 'a, 'b)
- .filter("a % 2 = 0")
-
- val result = replaceString(tEnv.explain(table))
-
- val source = scala.io.Source.fromFile(testFilePath +
- "../../src/test/scala/resources/testFilterStream0.out").mkString
- val expect = replaceString(source)
- assertEquals(result, expect)
- }
-
- @Test
- def testUnion(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
- val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
- val table = table1.unionAll(table2)
-
- val result = replaceString(tEnv.explain(table))
-
- val source = scala.io.Source.fromFile(testFilePath +
- "../../src/test/scala/resources/testUnionStream0.out").mkString
- val expect = replaceString(source)
- assertEquals(result, expect)
- }
-
- def replaceString(s: String): String = {
- /* Stage {id} is ignored, because id keeps incrementing in test class
- * while StreamExecutionEnvironment is up
- */
- s.replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "")
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSinkITCase.scala
deleted file mode 100644
index f3eb87c..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSinkITCase.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.stream
-
-import java.io.File
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.stream.utils.StreamTestData
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.TableEnvironment
-import org.apache.flink.api.table.sinks.CsvTableSink
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.test.util.TestBaseUtils
-
-import org.junit.Test
-
-class TableSinkITCase extends StreamingMultipleProgramsTestBase {
-
- @Test
- def testStreamTableSink(): Unit = {
-
- val tmpFile = File.createTempFile("flink-table-sink-test", ".tmp")
- tmpFile.deleteOnExit()
- val path = tmpFile.toURI.toString
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setParallelism(4)
-
- val input = StreamTestData.get3TupleDataStream(env)
- .map(x => x).setParallelism(4) // increase DOP to 4
-
- val results = input.toTable(tEnv, 'a, 'b, 'c)
- .where('a < 5 || 'a > 17)
- .select('c, 'b)
- .writeToSink(new CsvTableSink(path))
-
- env.execute()
-
- val expected = Seq(
- "Hi,1", "Hello,2", "Hello world,2", "Hello world, how are you?,3",
- "Comment#12,6", "Comment#13,6", "Comment#14,6", "Comment#15,6").mkString("\n")
-
- TestBaseUtils.compareResultsByLinesInMemory(expected, path)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala
deleted file mode 100644
index 1c93112..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.stream
-
-import java.io.{File, FileOutputStream, OutputStreamWriter}
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.stream.utils.StreamITCase
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.sources.{CsvTableSource, StreamTableSource}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.table.TableEnvironment
-import org.apache.flink.types.Row
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.environment
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-class TableSourceITCase extends StreamingMultipleProgramsTestBase {
-
- @Test
- def testStreamTableSourceTableAPI(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- tEnv.registerTableSource("MyTestTable", new TestStreamTableSource(33))
- tEnv.ingest("MyTestTable")
- .where('amount < 4)
- .select('amount * 'id, 'name)
- .toDataStream[Row]
- .addSink(new StreamITCase.StringSink)
-
- env.execute()
-
- val expected = mutable.MutableList(
- "0,Record_0", "0,Record_16", "0,Record_32",
- "1,Record_1", "17,Record_17", "36,Record_18",
- "4,Record_2", "57,Record_19", "9,Record_3")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testStreamTableSourceSQL(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- tEnv.registerTableSource("MyTestTable", new TestStreamTableSource(33))
- tEnv.sql(
- "SELECT amount * id, name FROM MyTestTable WHERE amount < 4")
- .toDataStream[Row]
- .addSink(new StreamITCase.StringSink)
-
- env.execute()
-
- val expected = mutable.MutableList(
- "0,Record_0", "0,Record_16", "0,Record_32",
- "1,Record_1", "17,Record_17", "36,Record_18",
- "4,Record_2", "57,Record_19", "9,Record_3")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testCsvTableSource(): Unit = {
-
- val csvRecords = Seq(
- "First#Id#Score#Last",
- "Mike#1#12.3#Smith",
- "Bob#2#45.6#Taylor",
- "Sam#3#7.89#Miller",
- "Peter#4#0.12#Smith",
- "% Just a comment",
- "Liz#5#34.5#Williams",
- "Sally#6#6.78#Miller",
- "Alice#7#90.1#Smith",
- "Kelly#8#2.34#Williams"
- )
-
- val tempFile = File.createTempFile("csv-test", "tmp")
- tempFile.deleteOnExit()
- val tmpWriter = new OutputStreamWriter(new FileOutputStream(tempFile), "UTF-8")
- tmpWriter.write(csvRecords.mkString("$"))
- tmpWriter.close()
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val csvTable = new CsvTableSource(
- tempFile.getAbsolutePath,
- Array("first", "id", "score", "last"),
- Array(
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO
- ),
- fieldDelim = "#",
- rowDelim = "$",
- ignoreFirstLine = true,
- ignoreComments = "%"
- )
-
- tEnv.registerTableSource("csvTable", csvTable)
- tEnv.sql(
- "SELECT last, score, id FROM csvTable WHERE id < 4 ")
- .toDataStream[Row]
- .addSink(new StreamITCase.StringSink)
-
- env.execute()
-
- val expected = mutable.MutableList(
- "Smith,12.3,1",
- "Taylor,45.6,2",
- "Miller,7.89,3")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-}
-
-
-class TestStreamTableSource(val numRecords: Int) extends StreamTableSource[Row] {
-
- val fieldTypes: Array[TypeInformation[_]] = Array(
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO
- )
-
- /** Returns the data of the table as a [[DataStream]]. */
- override def getDataStream(execEnv: environment.StreamExecutionEnvironment): DataStream[Row] = {
- execEnv.addSource(new GeneratingSourceFunction(numRecords), getReturnType).setParallelism(1)
- }
-
- /** Returns the types of the table fields. */
- override def getFieldTypes: Array[TypeInformation[_]] = fieldTypes
-
- /** Returns the names of the table fields. */
- override def getFieldsNames: Array[String] = Array("name", "id", "amount")
-
- /** Returns the [[TypeInformation]] for the return type. */
- override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes: _*)
-
- /** Returns the number of fields of the table. */
- override def getNumberOfFields: Int = 3
-}
-
-class GeneratingSourceFunction(val num: Long) extends SourceFunction[Row] {
-
- var running = true
-
- override def run(ctx: SourceContext[Row]): Unit = {
- var cnt = 0L
- while(running && cnt < num) {
- val out = new Row(3)
- out.setField(0, s"Record_$cnt")
- out.setField(1, cnt)
- out.setField(2, (cnt % 16).toInt)
-
- ctx.collect(out)
- cnt += 1
- }
- }
-
- override def cancel(): Unit = {
- running = false
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/SqlITCase.scala
deleted file mode 100644
index c4ca964..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/SqlITCase.scala
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.stream.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.stream.utils.{StreamTestData, StreamITCase}
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.TableEnvironment
-import org.apache.flink.types.Row
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Assert._
-import org.junit._
-
-import scala.collection.mutable
-
-class SqlITCase extends StreamingMultipleProgramsTestBase {
-
- /** test selection **/
- @Test
- def testSelectExpressionFromTable(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val sqlQuery = "SELECT a * 2, b - 1 FROM MyTable"
-
- val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("MyTable", t)
-
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
- result.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = mutable.MutableList("2,0", "4,1", "6,1")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- /** test filtering with registered table **/
- @Test
- def testSimpleFilter(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val sqlQuery = "SELECT * FROM MyTable WHERE a = 3"
-
- val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("MyTable", t)
-
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
- result.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = mutable.MutableList("3,2,Hello world")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- /** test filtering with registered datastream **/
- @Test
- def testDatastreamFilter(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"
-
- val t = StreamTestData.getSmall3TupleDataStream(env)
- tEnv.registerDataStream("MyTable", t)
-
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
- result.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = mutable.MutableList("3,2,Hello world")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- /** test union with registered tables **/
- @Test
- def testUnion(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val sqlQuery = "SELECT * FROM T1 " +
- "UNION ALL " +
- "SELECT * FROM T2"
-
- val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("T1", t1)
- val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("T2", t2)
-
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
- result.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = mutable.MutableList(
- "1,1,Hi", "1,1,Hi",
- "2,2,Hello", "2,2,Hello",
- "3,2,Hello world", "3,2,Hello world")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- /** test union with filter **/
- @Test
- def testUnionWithFilter(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val sqlQuery = "SELECT * FROM T1 WHERE a = 3 " +
- "UNION ALL " +
- "SELECT * FROM T2 WHERE a = 2"
-
- val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("T1", t1)
- val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("T2", t2)
-
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
- result.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = mutable.MutableList(
- "2,2,Hello",
- "3,2,Hello world")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- /** test union of a table and a datastream **/
- @Test
- def testUnionTableWithDataSet(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val sqlQuery = "SELECT c FROM T1 WHERE a = 3 " +
- "UNION ALL " +
- "SELECT c FROM T2 WHERE a = 2"
-
- val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("T1", t1)
- val t2 = StreamTestData.get3TupleDataStream(env)
- tEnv.registerDataStream("T2", t2, 'a, 'b, 'c)
-
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
- result.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = mutable.MutableList("Hello", "Hello world")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
deleted file mode 100644
index 21629e4..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.stream.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.utils.{HierarchyTableFunction, PojoTableFunc, TableFunc2}
-import org.apache.flink.api.table.utils._
-import org.apache.flink.api.table.utils.TableTestUtil._
-import org.junit.Test
-
-class UserDefinedTableFunctionTest extends TableTestBase {
-
- @Test
- def testCrossJoin(): Unit = {
- val util = streamTestUtil()
- val func1 = new TableFunc1
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- util.addFunction("func1", func1)
-
- val sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(c)) AS T(s)"
-
- val expected = unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamCorrelate",
- streamTableNode(0),
- term("invocation", "func1($cor0.c)"),
- term("function", func1.getClass.getCanonicalName),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
- term("joinType", "INNER")
- ),
- term("select", "c", "f0 AS s")
- )
-
- util.verifySql(sqlQuery, expected)
-
- // test overloading
-
- val sqlQuery2 = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(c, '$')) AS T(s)"
-
- val expected2 = unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamCorrelate",
- streamTableNode(0),
- term("invocation", "func1($cor0.c, '$')"),
- term("function", func1.getClass.getCanonicalName),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
- term("joinType", "INNER")
- ),
- term("select", "c", "f0 AS s")
- )
-
- util.verifySql(sqlQuery2, expected2)
- }
-
- @Test
- def testLeftOuterJoin(): Unit = {
- val util = streamTestUtil()
- val func1 = new TableFunc1
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- util.addFunction("func1", func1)
-
- val sqlQuery = "SELECT c, s FROM MyTable LEFT JOIN LATERAL TABLE(func1(c)) AS T(s) ON TRUE"
-
- val expected = unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamCorrelate",
- streamTableNode(0),
- term("invocation", "func1($cor0.c)"),
- term("function", func1.getClass.getCanonicalName),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
- term("joinType", "LEFT")
- ),
- term("select", "c", "f0 AS s")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testCustomType(): Unit = {
- val util = streamTestUtil()
- val func2 = new TableFunc2
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- util.addFunction("func2", func2)
-
- val sqlQuery = "SELECT c, name, len FROM MyTable, LATERAL TABLE(func2(c)) AS T(name, len)"
-
- val expected = unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamCorrelate",
- streamTableNode(0),
- term("invocation", "func2($cor0.c)"),
- term("function", func2.getClass.getCanonicalName),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
- "VARCHAR(2147483647) f0, INTEGER f1)"),
- term("joinType", "INNER")
- ),
- term("select", "c", "f0 AS name", "f1 AS len")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testHierarchyType(): Unit = {
- val util = streamTestUtil()
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- val function = new HierarchyTableFunction
- util.addFunction("hierarchy", function)
-
- val sqlQuery = "SELECT c, T.* FROM MyTable, LATERAL TABLE(hierarchy(c)) AS T(name, adult, len)"
-
- val expected = unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamCorrelate",
- streamTableNode(0),
- term("invocation", "hierarchy($cor0.c)"),
- term("function", function.getClass.getCanonicalName),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
- " VARCHAR(2147483647) f0, BOOLEAN f1, INTEGER f2)"),
- term("joinType", "INNER")
- ),
- term("select", "c", "f0 AS name", "f1 AS adult", "f2 AS len")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testPojoType(): Unit = {
- val util = streamTestUtil()
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- val function = new PojoTableFunc
- util.addFunction("pojo", function)
-
- val sqlQuery = "SELECT c, name, age FROM MyTable, LATERAL TABLE(pojo(c))"
-
- val expected = unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamCorrelate",
- streamTableNode(0),
- term("invocation", "pojo($cor0.c)"),
- term("function", function.getClass.getCanonicalName),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
- " INTEGER age, VARCHAR(2147483647) name)"),
- term("joinType", "INNER")
- ),
- term("select", "c", "name", "age")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testFilter(): Unit = {
- val util = streamTestUtil()
- val func2 = new TableFunc2
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- util.addFunction("func2", func2)
-
- val sqlQuery = "SELECT c, name, len FROM MyTable, LATERAL TABLE(func2(c)) AS T(name, len) " +
- "WHERE len > 2"
-
- val expected = unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamCorrelate",
- streamTableNode(0),
- term("invocation", "func2($cor0.c)"),
- term("function", func2.getClass.getCanonicalName),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
- "VARCHAR(2147483647) f0, INTEGER f1)"),
- term("joinType", "INNER"),
- term("condition", ">($1, 2)")
- ),
- term("select", "c", "f0 AS name", "f1 AS len")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testScalarFunction(): Unit = {
- val util = streamTestUtil()
- val func1 = new TableFunc1
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- util.addFunction("func1", func1)
-
- val sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(SUBSTRING(c, 2))) AS T(s)"
-
- val expected = unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamCorrelate",
- streamTableNode(0),
- term("invocation", "func1(SUBSTRING($cor0.c, 2))"),
- term("function", func1.getClass.getCanonicalName),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
- term("joinType", "INNER")
- ),
- term("select", "c", "f0 AS s")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala
deleted file mode 100644
index d398556..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.stream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark
-import org.apache.flink.api.scala.stream.utils.StreamITCase
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table._
-import org.apache.flink.types.Row
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-/**
- * We only test some aggregations until better testing of constructed DataStream
- * programs is possible.
- */
-class AggregationsITCase extends StreamingMultipleProgramsTestBase {
-
- val data = List(
- (1L, 1, "Hi"),
- (2L, 2, "Hello"),
- (4L, 2, "Hello"),
- (8L, 3, "Hello world"),
- (16L, 3, "Hello world"))
-
- @Test
- def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val stream = env.fromCollection(data)
- val table = stream.toTable(tEnv, 'long, 'int, 'string)
-
- val windowedTable = table
- .groupBy('string)
- .window(Slide over 2.rows every 1.rows)
- .select('string, 'int.count, 'int.avg)
-
- val results = windowedTable.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = Seq("Hello world,1,3", "Hello world,2,3", "Hello,1,2", "Hello,2,2", "Hi,1,1")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testEventTimeSessionGroupWindowOverTime(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val stream = env
- .fromCollection(data)
- .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
- val table = stream.toTable(tEnv, 'long, 'int, 'string)
-
- val windowedTable = table
- .groupBy('string)
- .window(Session withGap 7.milli on 'rowtime)
- .select('string, 'int.count)
-
- val results = windowedTable.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = Seq("Hello world,1", "Hello world,1", "Hello,2", "Hi,1")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val stream = env.fromCollection(data)
- val table = stream.toTable(tEnv, 'long, 'int, 'string)
-
- val windowedTable = table
- .window(Tumble over 2.rows)
- .select('int.count)
-
- val results = windowedTable.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = Seq("2", "2")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testEventTimeTumblingWindow(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val stream = env
- .fromCollection(data)
- .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
- val table = stream.toTable(tEnv, 'long, 'int, 'string)
-
- val windowedTable = table
- .groupBy('string)
- .window(Tumble over 5.milli on 'rowtime as 'w)
- .select('string, 'int.count, 'int.avg, 'w.start, 'w.end)
-
- val results = windowedTable.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = Seq(
- "Hello world,1,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01",
- "Hello world,1,3,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02",
- "Hello,2,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
- "Hi,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testEventTimeSlidingWindow(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val stream = env
- .fromCollection(data)
- .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
- val table = stream.toTable(tEnv, 'long, 'int, 'string)
-
- val windowedTable = table
- .groupBy('string)
- .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
- .select('string, 'int.count, 'w.start, 'w.end, 'w.start)
-
- val results = windowedTable.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = Seq(
- "Hello world,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0",
- "Hello world,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015,1970-01-01 00:00:00.005",
- "Hello world,1,1970-01-01 00:00:00.01,1970-01-01 00:00:00.02,1970-01-01 00:00:00.01",
- "Hello world,1,1970-01-01 00:00:00.015,1970-01-01 00:00:00.025,1970-01-01 00:00:00.015",
- "Hello,2,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005,1969-12-31 23:59:59.995",
- "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0",
- "Hi,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005,1969-12-31 23:59:59.995",
- "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-}
-
-object GroupWindowITCase {
- class TimestampWithEqualWatermark extends AssignerWithPunctuatedWatermarks[(Long, Int, String)] {
-
- override def checkAndGetNextWatermark(
- lastElement: (Long, Int, String),
- extractedTimestamp: Long)
- : Watermark = {
- new Watermark(extractedTimestamp)
- }
-
- override def extractTimestamp(
- element: (Long, Int, String),
- previousElementTimestamp: Long): Long = {
- element._1
- }
- }
-}