You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/07/13 10:18:46 UTC
[37/44] flink git commit: [FLINK-6617] [table] Restructuring of tests
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/JoinValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/JoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/JoinValidationTest.scala
deleted file mode 100644
index a377ca3..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/JoinValidationTest.scala
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.table.validation
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException}
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.types.Row
-import org.junit._
-
-class JoinValidationTest extends TableTestBase {
-
- @Test(expected = classOf[ValidationException])
- def testJoinNonExistingKey(): Unit = {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
- ds1.join(ds2)
- // must fail. Field 'foo does not exist
- .where('foo === 'e)
- .select('c, 'g)
- }
-
- @Test(expected = classOf[ValidationException])
- def testJoinWithNonMatchingKeyTypes(): Unit = {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
- ds1.join(ds2)
- // must fail. Field 'a is Int, and 'g is String
- .where('a === 'g)
- .select('c, 'g).collect()
- }
-
- @Test(expected = classOf[ValidationException])
- def testJoinWithAmbiguousFields(): Unit = {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'c)
-
- ds1.join(ds2)
- // must fail. Both inputs share the same field 'c
- .where('a === 'd)
- .select('c, 'g)
- }
-
- @Test(expected = classOf[TableException])
- def testNoEqualityJoinPredicate1(): Unit = {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
- ds1.join(ds2)
- // must fail. No equality join predicate
- .where('d === 'f)
- .select('c, 'g).collect()
- }
-
- @Test(expected = classOf[TableException])
- def testNoEqualityJoinPredicate2(): Unit = {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
- ds1.join(ds2)
- // must fail. No equality join predicate
- .where('a < 'd)
- .select('c, 'g).collect()
- }
-
- @Test(expected = classOf[ValidationException])
- def testJoinTablesFromDifferentEnvs(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv1 = TableEnvironment.getTableEnvironment(env)
- val tEnv2 = TableEnvironment.getTableEnvironment(env)
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
- val ds2 = CollectionDataSets.get5TupleDataSet(env)
- val in1 = tEnv1.fromDataSet(ds1, 'a, 'b, 'c)
- val in2 = tEnv2.fromDataSet(ds2, 'd, 'e, 'f, 'g, 'c)
-
- // Must fail. Tables are bound to different TableEnvironments.
- in1.join(in2).where('b === 'e).select('c, 'g)
- }
-
- @Test(expected = classOf[ValidationException])
- def testNoJoinCondition(): Unit = {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
- ds2.leftOuterJoin(ds1, 'b === 'd && 'b < 3).select('c, 'g)
- }
-
- @Test(expected = classOf[ValidationException])
- def testNoEquiJoin(): Unit = {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
- ds2.leftOuterJoin(ds1, 'b < 'd).select('c, 'g)
- }
-
- @Test(expected = classOf[ValidationException])
- @throws[Exception]
- def testJoinNonExistingKeyJava() {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
- // Must fail. Field foo does not exist.
- ds1.join(ds2).where("foo === e").select("c, g")
- }
-
- @Test(expected = classOf[ValidationException])
- @throws[Exception]
- def testJoinWithNonMatchingKeyTypesJava() {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'c)
- ds1.join(ds2)
- // Must fail. Types of join fields are not compatible (Integer and String)
- .where("a === g").select("c, g")
- }
-
- @Test(expected = classOf[ValidationException])
- @throws[Exception]
- def testJoinWithAmbiguousFieldsJava() {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'c)
- // Must fail. Join input have overlapping field names.
- ds1.join(ds2).where("a === d").select("c, g")
- }
-
- @Test(expected = classOf[ValidationException])
- def testJoinTablesFromDifferentEnvsJava() {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv1 = TableEnvironment.getTableEnvironment(env)
- val tEnv2 = TableEnvironment.getTableEnvironment(env)
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
- val ds2 = CollectionDataSets.get5TupleDataSet(env)
- val in1 = tEnv1.fromDataSet(ds1, 'a, 'b, 'c)
- val in2 = tEnv2.fromDataSet(ds2, 'd, 'e, 'f, 'g, 'c)
- // Must fail. Tables are bound to different TableEnvironments.
- in1.join(in2).where("a === d").select("g.count")
- }
-
- @Test(expected = classOf[ValidationException])
- def testRightJoinWithNonEquiJoinPredicate(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- tEnv.getConfig.setNullCheck(true)
-
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
- ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g).toDataSet[Row].collect()
- }
-
- @Test(expected = classOf[ValidationException])
- def testLeftJoinWithLocalPredicate(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- tEnv.getConfig.setNullCheck(true)
-
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
- ds1.leftOuterJoin(ds2, 'a === 'd && 'b < 3).select('c, 'g).toDataSet[Row].collect()
- }
-
- @Test(expected = classOf[ValidationException])
- def testFullJoinWithLocalPredicate(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- tEnv.getConfig.setNullCheck(true)
-
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
- ds1.fullOuterJoin(ds2, 'a === 'd && 'b < 3).select('c, 'g).toDataSet[Row].collect()
- }
-
- @Test(expected = classOf[ValidationException])
- def testRightJoinWithLocalPredicate(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- tEnv.getConfig.setNullCheck(true)
-
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
- ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 3).select('c, 'g).toDataSet[Row].collect()
- }
-
- @Test(expected = classOf[ValidationException])
- def testLeftJoinWithNonEquiJoinPredicate(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- tEnv.getConfig.setNullCheck(true)
-
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
- ds1.leftOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g).toDataSet[Row].collect()
- }
-
- @Test(expected = classOf[ValidationException])
- def testFullJoinWithNonEquiJoinPredicate(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- tEnv.getConfig.setNullCheck(true)
-
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
- ds1.fullOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g).toDataSet[Row].collect()
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SetOperatorsValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SetOperatorsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SetOperatorsValidationTest.scala
deleted file mode 100644
index c5586bf..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SetOperatorsValidationTest.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.table.validation
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{TableEnvironment, ValidationException}
-import org.apache.flink.table.utils.TableTestBase
-import org.junit._
-
-class SetOperatorsValidationTest extends TableTestBase {
-
- @Test(expected = classOf[ValidationException])
- def testUnionDifferentColumnSize(): Unit = {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'a, 'b, 'd, 'c, 'e)
-
- // must fail. Union inputs have different column size.
- ds1.unionAll(ds2)
- }
-
- @Test(expected = classOf[ValidationException])
- def testUnionDifferentFieldTypes(): Unit = {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'a, 'b, 'c, 'd, 'e)
- .select('a, 'b, 'c)
-
- // must fail. Union inputs have different field types.
- ds1.unionAll(ds2)
- }
-
- @Test(expected = classOf[ValidationException])
- def testUnionTablesFromDifferentEnvs(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv1 = TableEnvironment.getTableEnvironment(env)
- val tEnv2 = TableEnvironment.getTableEnvironment(env)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c)
-
- // Must fail. Tables are bound to different TableEnvironments.
- ds1.unionAll(ds2).select('c)
- }
-
- @Test(expected = classOf[ValidationException])
- def testMinusDifferentFieldTypes(): Unit = {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'a, 'b, 'c, 'd, 'e)
- .select('a, 'b, 'c)
-
- // must fail. Minus inputs have different field types.
- ds1.minus(ds2)
- }
-
- @Test(expected = classOf[ValidationException])
- def testMinusAllTablesFromDifferentEnvs(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv1 = TableEnvironment.getTableEnvironment(env)
- val tEnv2 = TableEnvironment.getTableEnvironment(env)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c)
-
- // Must fail. Tables are bound to different TableEnvironments.
- ds1.minusAll(ds2).select('c)
- }
-
- @Test(expected = classOf[ValidationException])
- def testIntersectWithDifferentFieldTypes(): Unit = {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'a, 'b, 'c, 'd, 'e)
- .select('a, 'b, 'c)
-
- // must fail. Intersect inputs have different field types.
- ds1.intersect(ds2)
- }
-
- @Test(expected = classOf[ValidationException])
- def testIntersectTablesFromDifferentEnvs(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv1 = TableEnvironment.getTableEnvironment(env)
- val tEnv2 = TableEnvironment.getTableEnvironment(env)
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c)
-
- // Must fail. Tables are bound to different TableEnvironments.
- ds1.intersect(ds2).select('c)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SortValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SortValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SortValidationTest.scala
deleted file mode 100644
index a82d4d6..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SortValidationTest.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.table.validation
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.types.Row
-import org.junit._
-
-class SortValidationTest extends TableTestBase {
-
- @Test(expected = classOf[ValidationException])
- def testFetchWithoutOrder(): Unit = {
- val util = batchTestUtil()
- val ds = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
- val t = ds.limit(0, 5).toDataSet[Row].collect()
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/LogicalPlanFormatUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/LogicalPlanFormatUtils.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/LogicalPlanFormatUtils.scala
deleted file mode 100644
index 4a69271..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/LogicalPlanFormatUtils.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.utils
-
-object LogicalPlanFormatUtils {
- private val tempPattern = """TMP_\d+""".r
-
- def formatTempTableId(preStr: String): String = {
- val str = preStr.replaceAll("ArrayBuffer\\(", "List\\(")
- val minId = getMinTempTableId(str)
- tempPattern.replaceAllIn(str, s => "TMP_" + (s.matched.substring(4).toInt - minId) )
- }
-
- private def getMinTempTableId(logicalStr: String): Long = {
- val found = tempPattern.findAllIn(logicalStr).map(s => {
- s.substring(4).toInt
- })
- if (found.isEmpty) {
- 0L
- } else {
- found.min
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/SortTestUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/SortTestUtils.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/SortTestUtils.scala
deleted file mode 100644
index ef425d3..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/SortTestUtils.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.utils
-
-object SortTestUtils {
-
- val tupleDataSetStrings = List((1, 1L, "Hi")
- ,(2, 2L, "Hello")
- ,(3, 2L, "Hello world")
- ,(4, 3L, "Hello world, how are you?")
- ,(5, 3L, "I am fine.")
- ,(6, 3L, "Luke Skywalker")
- ,(7, 4L, "Comment#1")
- ,(8, 4L, "Comment#2")
- ,(9, 4L, "Comment#3")
- ,(10, 4L, "Comment#4")
- ,(11, 5L, "Comment#5")
- ,(12, 5L, "Comment#6")
- ,(13, 5L, "Comment#7")
- ,(14, 5L, "Comment#8")
- ,(15, 5L, "Comment#9")
- ,(16, 6L, "Comment#10")
- ,(17, 6L, "Comment#11")
- ,(18, 6L, "Comment#12")
- ,(19, 6L, "Comment#13")
- ,(20, 6L, "Comment#14")
- ,(21, 6L, "Comment#15"))
-
- def sortExpectedly(dataSet: List[Product])
- (implicit ordering: Ordering[Product]): String =
- sortExpectedly(dataSet, 0, dataSet.length)
-
- def sortExpectedly(dataSet: List[Product], start: Int, end: Int)
- (implicit ordering: Ordering[Product]): String = {
- dataSet
- .sorted(ordering)
- .slice(start, end)
- .mkString("\n")
- .replaceAll("[\\(\\)]", "")
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsClusterTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsClusterTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsClusterTestBase.scala
deleted file mode 100644
index 712b818..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsClusterTestBase.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.utils
-
-import java.util
-
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.runtime.dataset.table.DataSetWindowAggregateITCase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConversions._
-
-/**
- * This test base provides full cluster-like integration tests for batch programs. Only runtime
- * operator tests should use this test base as they are expensive.
- * (e.g. [[DataSetWindowAggregateITCase]])
- */
-class TableProgramsClusterTestBase(
- executionMode: TestExecutionMode,
- tableConfigMode: TableConfigMode)
- extends TableProgramsTestBase(executionMode, tableConfigMode) {
-}
-
-object TableProgramsClusterTestBase {
-
- @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
- def parameters(): util.Collection[Array[java.lang.Object]] = {
- Seq[Array[AnyRef]](
- Array(TestExecutionMode.CLUSTER, TableProgramsTestBase.DEFAULT),
- Array(TestExecutionMode.CLUSTER_OBJECT_REUSE, TableProgramsTestBase.DEFAULT)
- )
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsCollectionTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsCollectionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsCollectionTestBase.scala
deleted file mode 100644
index cd7c12d..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsCollectionTestBase.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.utils
-
-import java.util
-
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConversions._
-
-/**
- * This test base provides lightweight integration tests for batch programs. However, it does
- * not test everything (e.g. combiners). Runtime operator tests should
- * use [[TableProgramsClusterTestBase]].
- */
-class TableProgramsCollectionTestBase(
- tableConfigMode: TableConfigMode)
- extends TableProgramsTestBase(TestExecutionMode.COLLECTION, tableConfigMode) {
-}
-
-object TableProgramsCollectionTestBase {
-
- @Parameterized.Parameters(name = "Table config = {0}")
- def parameters(): util.Collection[Array[java.lang.Object]] = {
- Seq[Array[AnyRef]](Array(TableProgramsTestBase.DEFAULT))
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
deleted file mode 100644
index cf9d947..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.utils
-
-import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.{NO_NULL, TableConfigMode}
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-
-class TableProgramsTestBase(
- mode: TestExecutionMode,
- tableConfigMode: TableConfigMode)
- extends MultipleProgramsTestBase(mode) {
-
- def config: TableConfig = {
- val conf = new TableConfig
- tableConfigMode match {
- case NO_NULL =>
- conf.setNullCheck(false)
- case _ => // keep default
- }
- conf
- }
-}
-
-object TableProgramsTestBase {
- case class TableConfigMode(nullCheck: Boolean)
-
- val DEFAULT = TableConfigMode(nullCheck = true)
- val NO_NULL = TableConfigMode(nullCheck = false)
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/validation/TableEnvironmentValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/validation/TableEnvironmentValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/validation/TableEnvironmentValidationTest.scala
deleted file mode 100644
index 030a415..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/validation/TableEnvironmentValidationTest.scala
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.batch.validation
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{DOUBLE_TYPE_INFO, INT_TYPE_INFO, STRING_TYPE_INFO}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo, TupleTypeInfo, TypeExtractor}
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.api.scala.batch.{CClass, PojoClass}
-import org.apache.flink.table.api.{TableEnvironment, TableException}
-import org.apache.flink.table.expressions.{Alias, UnresolvedFieldReference}
-import org.apache.flink.table.runtime.types.CRowTypeInfo
-import org.apache.flink.types.Row
-import org.junit.Assert.assertTrue
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-@RunWith(classOf[Parameterized])
-class TableEnvironmentValidationTest(
- configMode: TableConfigMode)
- extends TableProgramsCollectionTestBase(configMode) {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val tupleType = new TupleTypeInfo(
- INT_TYPE_INFO,
- STRING_TYPE_INFO,
- DOUBLE_TYPE_INFO)
-
- val rowType = new RowTypeInfo(INT_TYPE_INFO, STRING_TYPE_INFO,DOUBLE_TYPE_INFO)
-
- val cRowType = new CRowTypeInfo(rowType)
-
- val caseClassType: TypeInformation[CClass] = implicitly[TypeInformation[CClass]]
-
- val pojoType: TypeInformation[PojoClass] = TypeExtractor.createTypeInfo(classOf[PojoClass])
-
- val atomicType = INT_TYPE_INFO
-
- val genericRowType = new GenericTypeInfo[Row](classOf[Row])
-
-
- @Test(expected = classOf[TableException])
- def testGetFieldInfoPojoNames1(): Unit = {
- tEnv.getFieldInfo(
- pojoType,
- Array(
- UnresolvedFieldReference("name1"),
- UnresolvedFieldReference("name2"),
- UnresolvedFieldReference("name3")
- ))
- }
-
- @Test(expected = classOf[TableException])
- def testGetFieldInfoAtomicName2(): Unit = {
- tEnv.getFieldInfo(
- atomicType,
- Array(
- UnresolvedFieldReference("name1"),
- UnresolvedFieldReference("name2")
- ))
- }
-
- @Test(expected = classOf[TableException])
- def testGetFieldInfoTupleAlias3(): Unit = {
- tEnv.getFieldInfo(
- tupleType,
- Array(
- Alias(UnresolvedFieldReference("xxx"), "name1"),
- Alias(UnresolvedFieldReference("yyy"), "name2"),
- Alias(UnresolvedFieldReference("zzz"), "name3")
- ))
- }
-
- @Test(expected = classOf[TableException])
- def testGetFieldInfoCClassAlias3(): Unit = {
- tEnv.getFieldInfo(
- caseClassType,
- Array(
- Alias(UnresolvedFieldReference("xxx"), "name1"),
- Alias(UnresolvedFieldReference("yyy"), "name2"),
- Alias(UnresolvedFieldReference("zzz"), "name3")
- ))
- }
-
- @Test(expected = classOf[TableException])
- def testGetFieldInfoPojoAlias3(): Unit = {
- tEnv.getFieldInfo(
- pojoType,
- Array(
- Alias(UnresolvedFieldReference("xxx"), "name1"),
- Alias(UnresolvedFieldReference("yyy"), "name2"),
- Alias(UnresolvedFieldReference("zzz"), "name3")
- ))
- }
-
- @Test(expected = classOf[TableException])
- def testGetFieldInfoAtomicAlias(): Unit = {
- tEnv.getFieldInfo(
- atomicType,
- Array(
- Alias(UnresolvedFieldReference("name1"), "name2")
- ))
- }
-
- @Test(expected = classOf[TableException])
- def testGetFieldInfoGenericRowAlias(): Unit = {
- tEnv.getFieldInfo(
- genericRowType,
- Array(UnresolvedFieldReference("first")))
- }
-
- @Test(expected = classOf[TableException])
- def testRegisterExistingDataSet(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds1 = CollectionDataSets.get3TupleDataSet(env)
- tEnv.registerDataSet("MyTable", ds1)
- val ds2 = CollectionDataSets.get5TupleDataSet(env)
- // Must fail. Name is already in use.
- tEnv.registerDataSet("MyTable", ds2)
- }
-
- @Test(expected = classOf[TableException])
- def testScanUnregisteredTable(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
- // Must fail. No table registered under that name.
- tEnv.scan("someTable")
- }
-
- @Test(expected = classOf[TableException])
- def testRegisterExistingTable(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val t1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
- tEnv.registerTable("MyTable", t1)
- val t2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv)
- // Must fail. Name is already in use.
- tEnv.registerDataSet("MyTable", t2)
- }
-
- @Test(expected = classOf[TableException])
- def testRegisterTableFromOtherEnv(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
- val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
-
- val t1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv1)
- // Must fail. Table is bound to different TableEnvironment.
- tEnv2.registerTable("MyTable", t1)
- }
-
- @Test(expected = classOf[TableException])
- def testToTableWithToManyFields(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- CollectionDataSets.get3TupleDataSet(env)
- // Must fail. Number of fields does not match.
- .toTable(tEnv, 'a, 'b, 'c, 'd)
- }
-
- @Test(expected = classOf[TableException])
- def testToTableWithAmbiguousFields(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- CollectionDataSets.get3TupleDataSet(env)
- // Must fail. Field names not unique.
- .toTable(tEnv, 'a, 'b, 'b)
- }
-
- @Test(expected = classOf[TableException])
- def testToTableWithNonFieldReference1(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- // Must fail. as() can only have field references
- CollectionDataSets.get3TupleDataSet(env)
- .toTable(tEnv, 'a + 1, 'b, 'c)
- }
-
- @Test(expected = classOf[TableException])
- def testToTableWithNonFieldReference2(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- // Must fail. as() can only have field references
- CollectionDataSets.get3TupleDataSet(env)
- .toTable(tEnv, 'a as 'foo, 'b, 'c)
- }
-
- @Test(expected = classOf[TableException])
- def testGenericRow() {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv = TableEnvironment.getTableEnvironment(env, config)
-
- // use null value the enforce GenericType
- val dataSet = env.fromElements(Row.of(null))
- assertTrue(dataSet.getType().isInstanceOf[GenericTypeInfo[_]])
- assertTrue(dataSet.getType().getTypeClass == classOf[Row])
-
- // Must fail. Cannot import DataSet<Row> with GenericTypeInfo.
- tableEnv.fromDataSet(dataSet)
- }
-
- @Test(expected = classOf[TableException])
- def testGenericRowWithAlias() {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv = TableEnvironment.getTableEnvironment(env, config)
-
- // use null value the enforce GenericType
- val dataSet = env.fromElements(Row.of(null))
- assertTrue(dataSet.getType().isInstanceOf[GenericTypeInfo[_]])
- assertTrue(dataSet.getType().getTypeClass == classOf[Row])
-
- // Must fail. Cannot import DataSet<Row> with GenericTypeInfo.
- tableEnv.fromDataSet(dataSet, "nullField")
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableEnvironmentTest.scala
deleted file mode 100644
index ea09f32..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableEnvironmentTest.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream
-
-import java.lang.{Integer => JInt, Long => JLong}
-
-import org.apache.flink.api.java.tuple.{Tuple5 => JTuple5}
-import org.apache.flink.api.java.typeutils.TupleTypeInfo
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecEnv}
-import org.apache.flink.table.api.java.{StreamTableEnvironment => JStreamTableEnv}
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{TableEnvironment, Types}
-import org.apache.flink.table.utils.TableTestBase
-import org.junit.Test
-import org.mockito.Mockito.{mock, when}
-
-class TableEnvironmentTest extends TableTestBase {
-
- @Test
- def testProctimeAttribute(): Unit = {
- val util = streamTestUtil()
- // cannot replace an attribute with proctime
- util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'pt.proctime)
- }
-
- @Test
- def testReplacedRowtimeAttribute(): Unit = {
- val util = streamTestUtil()
- util.addTable[(Long, Int, String, Int, Long)]('rt.rowtime, 'b, 'c, 'd, 'e)
- }
-
- @Test
- def testAppendedRowtimeAttribute(): Unit = {
- val util = streamTestUtil()
- util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rt.rowtime)
- }
-
- @Test
- def testRowtimeAndProctimeAttribute1(): Unit = {
- val util = streamTestUtil()
- util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rt.rowtime, 'pt.proctime)
- }
-
- @Test
- def testRowtimeAndProctimeAttribute2(): Unit = {
- val util = streamTestUtil()
- util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'pt.proctime, 'rt.rowtime)
- }
-
- @Test
- def testRowtimeAndProctimeAttribute3(): Unit = {
- val util = streamTestUtil()
- util.addTable[(Long, Int, String, Int, Long)]('rt.rowtime, 'b, 'c, 'd, 'e, 'pt.proctime)
- }
-
- @Test
- def testProctimeAttributeParsed(): Unit = {
- val (jTEnv, ds) = prepareSchemaExpressionParser
- jTEnv.fromDataStream(ds, "a, b, c, d, e, pt.proctime")
- }
-
- @Test
- def testReplacingRowtimeAttributeParsed(): Unit = {
- val (jTEnv, ds) = prepareSchemaExpressionParser
- jTEnv.fromDataStream(ds, "a.rowtime, b, c, d, e")
- }
-
- @Test
- def testAppedingRowtimeAttributeParsed(): Unit = {
- val (jTEnv, ds) = prepareSchemaExpressionParser
- jTEnv.fromDataStream(ds, "a, b, c, d, e, rt.rowtime")
- }
-
- @Test
- def testRowtimeAndProctimeAttributeParsed1(): Unit = {
- val (jTEnv, ds) = prepareSchemaExpressionParser
- jTEnv.fromDataStream(ds, "a, b, c, d, e, pt.proctime, rt.rowtime")
- }
-
- @Test
- def testRowtimeAndProctimeAttributeParsed2(): Unit = {
- val (jTEnv, ds) = prepareSchemaExpressionParser
- jTEnv.fromDataStream(ds, "rt.rowtime, b, c, d, e, pt.proctime")
- }
-
- private def prepareSchemaExpressionParser:
- (JStreamTableEnv, DataStream[JTuple5[JLong, JInt, String, JInt, JLong]]) = {
-
- val jStreamExecEnv = mock(classOf[JStreamExecEnv])
- when(jStreamExecEnv.getStreamTimeCharacteristic).thenReturn(TimeCharacteristic.EventTime)
- val jTEnv = TableEnvironment.getTableEnvironment(jStreamExecEnv)
-
- val sType = new TupleTypeInfo(Types.LONG, Types.INT, Types.STRING, Types.INT, Types.LONG)
- .asInstanceOf[TupleTypeInfo[JTuple5[JLong, JInt, String, JInt, JLong]]]
- val ds = mock(classOf[DataStream[JTuple5[JLong, JInt, String, JInt, JLong]]])
- when(ds.getType).thenReturn(sType)
-
- (jTEnv, ds)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSchemaTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSchemaTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSchemaTest.scala
deleted file mode 100644
index 1f976b7..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSchemaTest.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.stream
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.Types
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.junit.Assert.{assertEquals, assertTrue}
-import org.junit.Test
-
-class TableSchemaTest extends TableTestBase {
-
- @Test
- def testStreamTableSchema(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Int, String)]("MyTable", 'a, 'b)
- val schema = table.getSchema
-
- assertEquals("a", schema.getColumnNames.apply(0))
- assertEquals("b", schema.getColumnNames.apply(1))
-
- assertEquals(Types.INT, schema.getTypes.apply(0))
- assertEquals(Types.STRING, schema.getTypes.apply(1))
-
- val expectedString = "root\n" +
- " |-- a: Integer\n" +
- " |-- b: String\n"
- assertEquals(expectedString, schema.toString)
-
- assertTrue(schema.getColumnName(3).isEmpty)
- assertTrue(schema.getType(-1).isEmpty)
- assertTrue(schema.getType("c").isEmpty)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/ExpressionReductionTest.scala
deleted file mode 100644
index af9ce74..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/ExpressionReductionTest.scala
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.stream.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class ExpressionReductionTest extends TableTestBase {
-
- @Test
- def testReduceCalcExpression(): Unit = {
- val util = streamTestUtil()
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val sqlQuery = "SELECT " +
- "(3+4)+a, " +
- "b+(1+2), " +
- "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
- "TRIM(BOTH ' STRING '), " +
- "'test' || 'string', " +
- "NULLIF(1, 1), " +
- "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
- "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)), " +
- "1 IS NULL, " +
- "'TEST' LIKE '%EST', " +
- "FLOOR(2.5), " +
- "'TEST' IN ('west', 'TEST', 'rest'), " +
- "CAST(TRUE AS VARCHAR) || 'X'" +
- "FROM MyTable WHERE a>(1+7)"
-
- val expected = unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select",
- "+(7, a) AS EXPR$0",
- "+(b, 3) AS EXPR$1",
- "'b' AS EXPR$2",
- "'STRING' AS EXPR$3",
- "'teststring' AS EXPR$4",
- "null AS EXPR$5",
- "1990-10-24 23:00:01.123 AS EXPR$6",
- "19 AS EXPR$7",
- "false AS EXPR$8",
- "true AS EXPR$9",
- "2 AS EXPR$10",
- "true AS EXPR$11",
- "'trueX' AS EXPR$12"
- ),
- term("where", ">(a, 8)")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testReduceProjectExpression(): Unit = {
- val util = streamTestUtil()
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val sqlQuery = "SELECT " +
- "(3+4)+a, " +
- "b+(1+2), " +
- "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
- "TRIM(BOTH ' STRING '), " +
- "'test' || 'string', " +
- "NULLIF(1, 1), " +
- "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
- "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)), " +
- "1 IS NULL, " +
- "'TEST' LIKE '%EST', " +
- "FLOOR(2.5), " +
- "'TEST' IN ('west', 'TEST', 'rest'), " +
- "CAST(TRUE AS VARCHAR) || 'X'" +
- "FROM MyTable"
-
- val expected = unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select",
- "+(7, a) AS EXPR$0",
- "+(b, 3) AS EXPR$1",
- "'b' AS EXPR$2",
- "'STRING' AS EXPR$3",
- "'teststring' AS EXPR$4",
- "null AS EXPR$5",
- "1990-10-24 23:00:01.123 AS EXPR$6",
- "19 AS EXPR$7",
- "false AS EXPR$8",
- "true AS EXPR$9",
- "2 AS EXPR$10",
- "true AS EXPR$11",
- "'trueX' AS EXPR$12"
- )
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testReduceFilterExpression(): Unit = {
- val util = streamTestUtil()
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val sqlQuery = "SELECT " +
- "*" +
- "FROM MyTable WHERE a>(1+7)"
-
- val expected = unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "b", "c"),
- term("where", ">(a, 8)")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala
deleted file mode 100644
index a79d48f..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala
+++ /dev/null
@@ -1,504 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.stream.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestUtil._
-import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
-import org.junit.Test
-
-class OverWindowTest extends TableTestBase {
- private val streamUtil: StreamTableTestUtil = streamTestUtil()
- streamUtil.addTable[(Int, String, Long)](
- "MyTable",
- 'a, 'b, 'c,
- 'proctime.proctime, 'rowtime.rowtime)
-
- @Test
- def testProcTimeBoundedPartitionedRowsOver() = {
- val sql = "SELECT " +
- "c, " +
- "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
- "CURRENT ROW) as cnt1, " +
- "sum(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
- "CURRENT ROW) as sum1 " +
- "from MyTable"
-
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamOverAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "c", "proctime")
- ),
- term("partitionBy", "c"),
- term("orderBy", "proctime"),
- term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1")
- ),
- term("select", "c", "w0$o0 AS cnt1, CASE(>(w0$o0, 0), CAST(w0$o1), null) AS sum1")
- )
- streamUtil.verifySql(sql, expected)
- }
-
- @Test
- def testProcTimeBoundedPartitionedRangeOver() = {
-
- val sqlQuery =
- "SELECT a, " +
- " AVG(c) OVER (PARTITION BY a ORDER BY proctime " +
- " RANGE BETWEEN INTERVAL '2' HOUR PRECEDING AND CURRENT ROW) AS avgA " +
- "FROM MyTable"
-
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamOverAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "c", "proctime")
- ),
- term("partitionBy", "a"),
- term("orderBy", "proctime"),
- term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"),
- term(
- "select",
- "a",
- "c",
- "proctime",
- "COUNT(c) AS w0$o0",
- "$SUM0(c) AS w0$o1"
- )
- ),
- term("select", "a", "/(CASE(>(w0$o0, 0)", "CAST(w0$o1), null), w0$o0) AS avgA")
- )
-
- streamUtil.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testProcTimeBoundedNonPartitionedRangeOver() = {
-
- val sqlQuery =
- "SELECT a, " +
- " COUNT(c) OVER (ORDER BY proctime " +
- " RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) " +
- "FROM MyTable"
-
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamOverAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "c", "proctime")
- ),
- term("orderBy", "proctime"),
- term("range", "BETWEEN 10000 PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "proctime", "COUNT(c) AS w0$o0")
- ),
- term("select", "a", "w0$o0 AS $1")
- )
-
- streamUtil.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testProcTimeBoundedNonPartitionedRowsOver() = {
- val sql = "SELECT " +
- "c, " +
- "count(a) OVER (ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
- "CURRENT ROW)" +
- "from MyTable"
-
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamOverAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "c", "proctime")
- ),
- term("orderBy", "proctime"),
- term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0")
- ),
- term("select", "c", "w0$o0 AS $1")
- )
- streamUtil.verifySql(sql, expected)
- }
-
-
- @Test
- def testProcTimeUnboundedPartitionedRangeOver() = {
- val sql = "SELECT " +
- "c, " +
- "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
- "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
- "from MyTable"
-
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamOverAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "c", "proctime")
- ),
- term("partitionBy", "c"),
- term("orderBy", "proctime"),
- term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
- ),
- term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
- )
- streamUtil.verifySql(sql, expected)
- }
-
- @Test
- def testProcTimeUnboundedPartitionedRowsOver() = {
- val sql = "SELECT " +
- "c, " +
- "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " +
- "CURRENT ROW) " +
- "from MyTable"
-
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamOverAggregate",
- streamTableNode(0),
- term("partitionBy", "c"),
- term("orderBy", "proctime"),
- term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
- term("select", "a", "b", "c", "proctime", "rowtime", "COUNT(a) AS w0$o0")
- ),
- term("select", "c", "w0$o0 AS $1")
- )
- streamUtil.verifySql(sql, expected)
- }
-
- @Test
- def testProcTimeUnboundedNonPartitionedRangeOver() = {
- val sql = "SELECT " +
- "c, " +
- "count(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
- "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
- "from MyTable"
-
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamOverAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "c", "proctime")
- ),
- term("orderBy", "proctime"),
- term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
- ),
- term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
- )
- streamUtil.verifySql(sql, expected)
- }
-
- @Test
- def testProcTimeUnboundedNonPartitionedRowsOver() = {
- val sql = "SELECT " +
- "c, " +
- "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " +
- "CURRENT ROW) " +
- "from MyTable"
-
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamOverAggregate",
- streamTableNode(0),
- term("orderBy", "proctime"),
- term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
- term("select", "a", "b", "c", "proctime", "rowtime", "COUNT(a) AS w0$o0")
- ),
- term("select", "c", "w0$o0 AS $1")
- )
- streamUtil.verifySql(sql, expected)
- }
-
- @Test
- def testRowTimeBoundedPartitionedRowsOver() = {
- val sql = "SELECT " +
- "c, " +
- "count(a) OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 5 preceding AND " +
- "CURRENT ROW) " +
- "from MyTable"
-
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamOverAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "c", "rowtime")
- ),
- term("partitionBy", "c"),
- term("orderBy", "rowtime"),
- term("rows", "BETWEEN 5 PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
- ),
- term("select", "c", "w0$o0 AS $1")
- )
- streamUtil.verifySql(sql, expected)
- }
-
- @Test
- def testRowTimeBoundedPartitionedRangeOver() = {
- val sql = "SELECT " +
- "c, " +
- "count(a) OVER (PARTITION BY c ORDER BY rowtime " +
- "RANGE BETWEEN INTERVAL '1' SECOND preceding AND CURRENT ROW) " +
- "from MyTable"
-
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamOverAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "c", "rowtime")
- ),
- term("partitionBy", "c"),
- term("orderBy", "rowtime"),
- term("range", "BETWEEN 1000 PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
- ),
- term("select", "c", "w0$o0 AS $1")
- )
- streamUtil.verifySql(sql, expected)
- }
-
- @Test
- def testRowTimeBoundedNonPartitionedRowsOver() = {
- val sql = "SELECT " +
- "c, " +
- "count(a) OVER (ORDER BY rowtime ROWS BETWEEN 5 preceding AND " +
- "CURRENT ROW) " +
- "from MyTable"
-
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamOverAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "c", "rowtime")
- ),
- term("orderBy", "rowtime"),
- term("rows", "BETWEEN 5 PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
- ),
- term("select", "c", "w0$o0 AS $1")
- )
- streamUtil.verifySql(sql, expected)
- }
-
- @Test
- def testRowTimeBoundedNonPartitionedRangeOver() = {
- val sql = "SELECT " +
- "c, " +
- "count(a) OVER (ORDER BY rowtime " +
- "RANGE BETWEEN INTERVAL '1' SECOND preceding AND CURRENT ROW) as cnt1 " +
- "from MyTable"
-
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamOverAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "c", "rowtime")
- ),
- term("orderBy", "rowtime"),
- term("range", "BETWEEN 1000 PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
- ),
- term("select", "c", "w0$o0 AS $1")
- )
- streamUtil.verifySql(sql, expected)
- }
-
- @Test
- def testRowTimeUnboundedPartitionedRangeOver() = {
- val sql = "SELECT " +
- "c, " +
- "count(a) OVER (PARTITION BY c ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt1, " +
- "sum(a) OVER (PARTITION BY c ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt2 " +
- "from MyTable"
-
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamOverAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "c", "rowtime")
- ),
- term("partitionBy", "c"),
- term("orderBy", "rowtime"),
- term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
- ),
- term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
- )
- streamUtil.verifySql(sql, expected)
- }
-
- @Test
- def testRowTimeUnboundedPartitionedRowsOver() = {
- val sql = "SELECT " +
- "c, " +
- "count(a) OVER (PARTITION BY c ORDER BY rowtime ROWS UNBOUNDED preceding) as cnt1, " +
- "sum(a) OVER (PARTITION BY c ORDER BY rowtime ROWS UNBOUNDED preceding) as cnt2 " +
- "from MyTable"
-
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamOverAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "c", "rowtime")
- ),
- term("partitionBy", "c"),
- term("orderBy", "rowtime"),
- term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
- term(
- "select",
- "a",
- "c",
- "rowtime",
- "COUNT(a) AS w0$o0",
- "$SUM0(a) AS w0$o1"
- )
- ),
- term(
- "select",
- "c",
- "w0$o0 AS cnt1",
- "CASE(>(w0$o0, 0)",
- "CAST(w0$o1), null) AS cnt2"
- )
- )
- streamUtil.verifySql(sql, expected)
- }
-
- @Test
- def testRowTimeUnboundedNonPartitionedRangeOver() = {
- val sql = "SELECT " +
- "c, " +
- "count(a) OVER (ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt1, " +
- "sum(a) OVER (ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt2 " +
- "from MyTable"
-
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamOverAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "c", "rowtime")
- ),
- term("orderBy", "rowtime"),
- term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
- ),
- term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
- )
- streamUtil.verifySql(sql, expected)
- }
-
- @Test
- def testRowTimeUnboundedNonPartitionedRowsOver() = {
- val sql = "SELECT " +
- "c, " +
- "count(a) OVER (ORDER BY rowtime ROWS UNBOUNDED preceding) as cnt1, " +
- "sum(a) OVER (ORDER BY rowtime ROWS UNBOUNDED preceding) as cnt2 " +
- "from MyTable"
-
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamOverAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "c", "rowtime")
- ),
- term("orderBy", "rowtime"),
- term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
- term(
- "select",
- "a",
- "c",
- "rowtime",
- "COUNT(a) AS w0$o0",
- "$SUM0(a) AS w0$o1"
- )
- ),
- term(
- "select",
- "c",
- "w0$o0 AS cnt1",
- "CASE(>(w0$o0, 0)",
- "CAST(w0$o1), null) AS cnt2"
- )
- )
- streamUtil.verifySql(sql, expected)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortITCase.scala
deleted file mode 100644
index 2340591..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortITCase.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.scala.stream.sql.SortITCase.StringRowSelectorSink
-import org.apache.flink.table.api.scala.stream.sql.TimeTestUtil.EventTimeSourceFunction
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase}
-import org.apache.flink.types.Row
-
-import org.junit.Assert._
-import org.junit._
-
-import scala.collection.mutable
-
-class SortITCase extends StreamingWithStateTestBase {
-
- @Test
- def testEventTimeOrderBy(): Unit = {
- val data = Seq(
- Left((1500L, (1L, 15, "Hello"))),
- Left((1600L, (1L, 16, "Hello"))),
- Left((1000L, (1L, 1, "Hello"))),
- Left((2000L, (2L, 2, "Hello"))),
- Right(1000L),
- Left((2000L, (2L, 2, "Hello"))),
- Left((2000L, (2L, 3, "Hello"))),
- Left((3000L, (3L, 3, "Hello"))),
- Left((2000L, (3L, 1, "Hello"))),
- Right(2000L),
- Left((4000L, (4L, 4, "Hello"))),
- Right(3000L),
- Left((5000L, (5L, 5, "Hello"))),
- Right(5000L),
- Left((6000L, (6L, 65, "Hello"))),
- Left((6000L, (6L, 6, "Hello"))),
- Left((6000L, (6L, 67, "Hello"))),
- Left((6000L, (6L, -1, "Hello"))),
- Left((6000L, (6L, 6, "Hello"))),
- Right(7000L),
- Left((9000L, (6L, 9, "Hello"))),
- Left((8500L, (6L, 18, "Hello"))),
- Left((9000L, (6L, 7, "Hello"))),
- Right(10000L),
- Left((10000L, (7L, 7, "Hello World"))),
- Left((11000L, (7L, 77, "Hello World"))),
- Left((11000L, (7L, 17, "Hello World"))),
- Right(12000L),
- Left((14000L, (7L, 18, "Hello World"))),
- Right(14000L),
- Left((15000L, (8L, 8, "Hello World"))),
- Right(17000L),
- Left((20000L, (20L, 20, "Hello World"))),
- Right(19000L))
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStateBackend(getStateBackend)
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val t1 = env.addSource(new EventTimeSourceFunction[(Long, Int, String)](data))
- .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
- tEnv.registerTable("T1", t1)
-
- val sqlQuery = "SELECT b FROM T1 ORDER BY rowtime, b ASC "
-
-
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StringRowSelectorSink(0)).setParallelism(1)
- env.execute()
-
- val expected = mutable.MutableList(
- "1", "15", "16",
- "1", "2", "2", "3",
- "3",
- "4",
- "5",
- "-1", "6", "6", "65", "67",
- "18", "7", "9",
- "7", "17", "77",
- "18",
- "8",
- "20")
- assertEquals(expected, SortITCase.testResults)
- }
-}
-
-object SortITCase {
-
- final class StringRowSelectorSink(private val field:Int) extends RichSinkFunction[Row]() {
- def invoke(value: Row) {
- testResults.synchronized {
- testResults += value.getField(field).toString
- }
- }
- }
-
- var testResults: mutable.MutableList[String] = mutable.MutableList.empty[String]
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortTest.scala
deleted file mode 100644
index 1d50fc1..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortTest.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.stream.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestUtil._
-import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
-import org.junit.Test
-
-class SortTest extends TableTestBase {
- private val streamUtil: StreamTableTestUtil = streamTestUtil()
- streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c,
- 'proctime.proctime, 'rowtime.rowtime)
-
- @Test
- def testSortProcessingTime() = {
-
- val sqlQuery = "SELECT a FROM MyTable ORDER BY proctime, c"
-
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode("DataStreamSort",
- streamTableNode(0),
- term("orderBy", "proctime ASC", "c ASC")),
- term("select", "a", "TIME_MATERIALIZATION(proctime) AS proctime", "c"))
-
- streamUtil.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testSortRowTime() = {
-
- val sqlQuery = "SELECT a FROM MyTable ORDER BY rowtime, c"
-
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode("DataStreamSort",
- streamTableNode(0),
- term("orderBy", "rowtime ASC, c ASC")),
- term("select", "a", "TIME_MATERIALIZATION(rowtime) AS rowtime", "c"))
-
- streamUtil.verifySql(sqlQuery, expected)
- }
-
- // test should fail because time order is descending
- @Test(expected = classOf[TableException])
- def testSortProcessingTimeDesc() = {
-
- val sqlQuery = "SELECT a FROM MyTable ORDER BY proctime DESC, c"
- streamUtil.verifySql(sqlQuery, "")
- }
-
-
- // test should fail because time is not the primary order field
- @Test(expected = classOf[TableException])
- def testSortProcessingTimeSecondaryField() = {
-
- val sqlQuery = "SELECT a FROM MyTable ORDER BY c, proctime"
- streamUtil.verifySql(sqlQuery, "")
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/TimeTestUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/TimeTestUtil.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/TimeTestUtil.scala
deleted file mode 100644
index b883870..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/TimeTestUtil.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.sql
-
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
-import org.apache.flink.streaming.api.watermark.Watermark
-
-object TimeTestUtil {
-
- class EventTimeSourceFunction[T](
- dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
- override def run(ctx: SourceContext[T]): Unit = {
- dataWithTimestampList.foreach {
- case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
- case Right(w) => ctx.emitWatermark(new Watermark(w))
- }
- }
-
- override def cancel(): Unit = ???
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
deleted file mode 100644
index 58eedd0..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.{HierarchyTableFunction, PojoTableFunc, TableFunc2}
-import org.apache.flink.table.utils._
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class UserDefinedTableFunctionTest extends TableTestBase {
-
- @Test
- def testCrossJoin(): Unit = {
- val util = streamTestUtil()
- val func1 = new TableFunc1
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- util.addFunction("func1", func1)
-
- val sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(c)) AS T(s)"
-
- val expected = unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamCorrelate",
- streamTableNode(0),
- term("invocation", "func1($cor0.c)"),
- term("function", func1.getClass.getCanonicalName),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
- term("joinType", "INNER")
- ),
- term("select", "c", "f0 AS s")
- )
-
- util.verifySql(sqlQuery, expected)
-
- // test overloading
-
- val sqlQuery2 = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(c, '$')) AS T(s)"
-
- val expected2 = unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamCorrelate",
- streamTableNode(0),
- term("invocation", "func1($cor0.c, '$')"),
- term("function", func1.getClass.getCanonicalName),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
- term("joinType", "INNER")
- ),
- term("select", "c", "f0 AS s")
- )
-
- util.verifySql(sqlQuery2, expected2)
- }
-
- @Test
- def testLeftOuterJoin(): Unit = {
- val util = streamTestUtil()
- val func1 = new TableFunc1
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- util.addFunction("func1", func1)
-
- val sqlQuery = "SELECT c, s FROM MyTable LEFT JOIN LATERAL TABLE(func1(c)) AS T(s) ON TRUE"
-
- val expected = unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamCorrelate",
- streamTableNode(0),
- term("invocation", "func1($cor0.c)"),
- term("function", func1.getClass.getCanonicalName),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
- term("joinType", "LEFT")
- ),
- term("select", "c", "f0 AS s")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testCustomType(): Unit = {
- val util = streamTestUtil()
- val func2 = new TableFunc2
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- util.addFunction("func2", func2)
-
- val sqlQuery = "SELECT c, name, len FROM MyTable, LATERAL TABLE(func2(c)) AS T(name, len)"
-
- val expected = unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamCorrelate",
- streamTableNode(0),
- term("invocation", "func2($cor0.c)"),
- term("function", func2.getClass.getCanonicalName),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
- "VARCHAR(2147483647) f0, INTEGER f1)"),
- term("joinType", "INNER")
- ),
- term("select", "c", "f0 AS name", "f1 AS len")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testHierarchyType(): Unit = {
- val util = streamTestUtil()
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- val function = new HierarchyTableFunction
- util.addFunction("hierarchy", function)
-
- val sqlQuery = "SELECT c, T.* FROM MyTable, LATERAL TABLE(hierarchy(c)) AS T(name, adult, len)"
-
- val expected = unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamCorrelate",
- streamTableNode(0),
- term("invocation", "hierarchy($cor0.c)"),
- term("function", function.getClass.getCanonicalName),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
- " VARCHAR(2147483647) f0, BOOLEAN f1, INTEGER f2)"),
- term("joinType", "INNER")
- ),
- term("select", "c", "f0 AS name", "f1 AS adult", "f2 AS len")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testPojoType(): Unit = {
- val util = streamTestUtil()
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- val function = new PojoTableFunc
- util.addFunction("pojo", function)
-
- val sqlQuery = "SELECT c, name, age FROM MyTable, LATERAL TABLE(pojo(c))"
-
- val expected = unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamCorrelate",
- streamTableNode(0),
- term("invocation", "pojo($cor0.c)"),
- term("function", function.getClass.getCanonicalName),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
- " INTEGER age, VARCHAR(2147483647) name)"),
- term("joinType", "INNER")
- ),
- term("select", "c", "name", "age")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testFilter(): Unit = {
- val util = streamTestUtil()
- val func2 = new TableFunc2
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- util.addFunction("func2", func2)
-
- val sqlQuery = "SELECT c, name, len FROM MyTable, LATERAL TABLE(func2(c)) AS T(name, len) " +
- "WHERE len > 2"
-
- val expected = unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamCorrelate",
- streamTableNode(0),
- term("invocation", "func2($cor0.c)"),
- term("function", func2.getClass.getCanonicalName),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
- "VARCHAR(2147483647) f0, INTEGER f1)"),
- term("joinType", "INNER"),
- term("condition", ">($1, 2)")
- ),
- term("select", "c", "f0 AS name", "f1 AS len")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testScalarFunction(): Unit = {
- val util = streamTestUtil()
- val func1 = new TableFunc1
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- util.addFunction("func1", func1)
-
- val sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(SUBSTRING(c, 2))) AS T(s)"
-
- val expected = unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamCorrelate",
- streamTableNode(0),
- term("invocation", "func1(SUBSTRING($cor0.c, 2))"),
- term("function", func1.getClass.getCanonicalName),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
- term("joinType", "INNER")
- ),
- term("select", "c", "f0 AS s")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-}