You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/07/13 10:18:34 UTC
[25/44] flink git commit: [FLINK-6617] [table] Restructuring of tests
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
new file mode 100644
index 0000000..2e23161
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.table
+
+import java.util
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.table.runtime.utils.{TableProgramsCollectionTestBase, TableProgramsTestBase}
+import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.types.Row
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class TableEnvironmentITCase(
+ configMode: TableConfigMode)
+ extends TableProgramsCollectionTestBase(configMode) {
+
+ @Test
+ def testSimpleRegister(): Unit = {
+
+ val tableName = "MyTable"
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet(tableName, ds)
+ val t = tEnv.scan(tableName).select('_1, '_2, '_3)
+
+ val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+ "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+ "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testRegisterWithFields(): Unit = {
+
+ val tableName = "MyTable"
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet(tableName, ds, 'a, 'b, 'c)
+ val t = tEnv.scan(tableName).select('a, 'b)
+
+ val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" +
+ "7,4\n" + "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" +
+ "15,5\n" + "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testTableRegister(): Unit = {
+
+ val tableName = "MyTable"
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ tEnv.registerTable(tableName, t)
+
+ val regT = tEnv.scan(tableName).select('a, 'b).filter('a > 8)
+
+ val expected = "9,4\n" + "10,4\n" +
+ "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" +
+ "15,5\n" + "16,6\n" + "17,6\n" + "18,6\n" +
+ "19,6\n" + "20,6\n" + "21,6\n"
+
+ val results = regT.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testToTable(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env)
+ .toTable(tEnv, 'a, 'b, 'c)
+ .select('a, 'b, 'c)
+
+ val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+ "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+ "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testToTableFromCaseClass(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val data = List(
+ SomeCaseClass("Peter", 28, 4000.00, "Sales"),
+ SomeCaseClass("Anna", 56, 10000.00, "Engineering"),
+ SomeCaseClass("Lucy", 42, 6000.00, "HR"))
+
+ val t = env.fromCollection(data)
+ .toTable(tEnv, 'a, 'b, 'c, 'd)
+ .select('a, 'b, 'c, 'd)
+
+ val expected: String =
+ "Peter,28,4000.0,Sales\n" +
+ "Anna,56,10000.0,Engineering\n" +
+ "Lucy,42,6000.0,HR\n"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testToTableFromAndToCaseClass(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val data = List(
+ SomeCaseClass("Peter", 28, 4000.00, "Sales"),
+ SomeCaseClass("Anna", 56, 10000.00, "Engineering"),
+ SomeCaseClass("Lucy", 42, 6000.00, "HR"))
+
+ val t = env.fromCollection(data)
+ .toTable(tEnv, 'a, 'b, 'c, 'd)
+ .select('a, 'b, 'c, 'd)
+
+ val expected: String =
+ "SomeCaseClass(Peter,28,4000.0,Sales)\n" +
+ "SomeCaseClass(Anna,56,10000.0,Engineering)\n" +
+ "SomeCaseClass(Lucy,42,6000.0,HR)\n"
+ val results = t.toDataSet[SomeCaseClass].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+}
+
+object TableEnvironmentITCase {
+
+ @Parameterized.Parameters(name = "Table config = {0}")
+ def parameters(): util.Collection[Array[java.lang.Object]] = {
+ Seq[Array[AnyRef]](
+ Array(TableProgramsTestBase.DEFAULT)
+ ).asJava
+ }
+}
+
+case class SomeCaseClass(name: String, age: Int, salary: Double, department: String) {
+ def this() { this("", 0, 0.0, "") }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
new file mode 100644
index 0000000..d8ba29a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.table
+
+import java.io.File
+
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.TableProgramsCollectionTestBase
+import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.table.sinks.CsvTableSink
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+@RunWith(classOf[Parameterized])
+class TableSinkITCase(
+ configMode: TableConfigMode)
+ extends TableProgramsCollectionTestBase(configMode) {
+
+ @Test
+ def testBatchTableSink(): Unit = {
+
+ val tmpFile = File.createTempFile("flink-table-sink-test", ".tmp")
+ tmpFile.deleteOnExit()
+ val path = tmpFile.toURI.toString
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ env.setParallelism(4)
+
+ val input = CollectionDataSets.get3TupleDataSet(env)
+ .map(x => x).setParallelism(4) // increase DOP to 4
+
+ val results = input.toTable(tEnv, 'a, 'b, 'c)
+ .where('a < 5 || 'a > 17)
+ .select('c, 'b)
+ .writeToSink(new CsvTableSink(path, fieldDelim = "|"))
+
+ env.execute()
+
+ val expected = Seq(
+ "Hi|1", "Hello|2", "Hello world|2", "Hello world, how are you?|3",
+ "Comment#12|6", "Comment#13|6", "Comment#14|6", "Comment#15|6").mkString("\n")
+
+ TestBaseUtils.compareResultsByLinesInMemory(expected, path)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala
new file mode 100644
index 0000000..5e214b1
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.table
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.{CommonTestData, TableProgramsCollectionTestBase}
+import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.table.utils.TestFilterableTableSource
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class TableSourceITCase(
+ configMode: TableConfigMode)
+ extends TableProgramsCollectionTestBase(configMode) {
+
+ @Test
+ def testCsvTableSourceWithProjection(): Unit = {
+ val csvTable = CommonTestData.getCsvTableSource
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ tEnv.registerTableSource("csvTable", csvTable)
+
+ val results = tEnv
+ .scan("csvTable")
+ .where('score < 20)
+ .select('last, 'id.floor(), 'score * 2)
+ .collect()
+
+ val expected = Seq(
+ "Smith,1,24.6",
+ "Miller,3,15.78",
+ "Smith,4,0.24",
+ "Miller,6,13.56",
+ "Williams,8,4.68").mkString("\n")
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testTableSourceWithFilterable(): Unit = {
+ val tableName = "MyTable"
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tableEnv = TableEnvironment.getTableEnvironment(env, config)
+ tableEnv.registerTableSource(tableName, new TestFilterableTableSource)
+ val results = tableEnv
+ .scan(tableName)
+ .where("amount > 4 && price < 9")
+ .select("id, name")
+ .collect()
+
+ val expected = Seq(
+ "5,Record_5", "6,Record_6", "7,Record_7", "8,Record_8").mkString("\n")
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/sql/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/sql/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/sql/AggregationsITCase.scala
deleted file mode 100644
index eb516e7..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/sql/AggregationsITCase.scala
+++ /dev/null
@@ -1,394 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.dataset.sql
-
-import java.sql.Timestamp
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithMergeAndReset
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.functions.aggfunctions.CountAggFunction
-import org.apache.flink.test.util.TestBaseUtils
-import org.apache.flink.types.Row
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class AggregationsITCase(
- configMode: TableConfigMode)
- extends TableProgramsCollectionTestBase(configMode) {
-
- @Test
- def testAggregationTypes(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT sum(_1), min(_1), max(_1), count(_1), avg(_1) FROM MyTable"
-
- val ds = CollectionDataSets.get3TupleDataSet(env)
- tEnv.registerDataSet("MyTable", ds)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "231,1,21,21,11"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testTableAggregation(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT sum(_1) FROM MyTable"
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
- tEnv.registerTable("MyTable", ds)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "231"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testDataSetAggregation(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT sum(_1) FROM MyTable"
-
- val ds = CollectionDataSets.get3TupleDataSet(env)
- tEnv.registerDataSet("MyTable", ds)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "231"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testAggregationDataTypes(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT avg(a), avg(b), avg(c), avg(d), avg(e), avg(f), count(g), " +
- "min(g), min('Ciao'), max(g), max('Ciao'), sum(CAST(f AS DECIMAL)) FROM MyTable"
-
- val ds = env.fromElements(
- (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
- (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'f, 'g)
- tEnv.registerTable("MyTable", ds)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "1,1,1,1,1.5,1.5,2,Ciao,Ciao,Hello,Ciao,3.0"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testTableProjection(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT avg(a), sum(a), count(a), avg(b), sum(b) " +
- "FROM MyTable"
-
- val ds = env.fromElements((1: Byte, 1: Short), (2: Byte, 2: Short)).toTable(tEnv, 'a, 'b)
- tEnv.registerTable("MyTable", ds)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "1,3,2,1,3"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testTableAggregationWithArithmetic(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT avg(a + 2) + 2, count(b) + 5 " +
- "FROM MyTable"
-
- val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv, 'a, 'b)
- tEnv.registerTable("MyTable", ds)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "5.5,7"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testAggregationWithTwoCount(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT count(_1), count(_2) FROM MyTable"
-
- val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
- tEnv.registerTable("MyTable", ds)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "2,2"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
-
- @Test
- def testAggregationAfterProjection(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM " +
- "(SELECT _1 as a, _2 as b, _3 as c FROM MyTable)"
-
- val ds = env.fromElements(
- (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
- (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv)
- tEnv.registerTable("MyTable", ds)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "1,3,2"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testDistinctAggregate(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT sum(_1) as a, count(distinct _3) as b FROM MyTable"
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
- tEnv.registerTable("MyTable", ds)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "231,21"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testGroupedDistinctAggregate(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT _2, avg(distinct _1) as a, count(_3) as b FROM MyTable GROUP BY _2"
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
- tEnv.registerTable("MyTable", ds)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected =
- "6,18,6\n5,13,5\n4,8,4\n3,5,3\n2,2,2\n1,1,1"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testGroupingSetAggregate(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery =
- "SELECT _2, _3, avg(_1) as a, GROUP_ID() as g FROM MyTable GROUP BY GROUPING SETS (_2, _3)"
-
- val ds = CollectionDataSets.get3TupleDataSet(env)
- tEnv.registerDataSet("MyTable", ds)
-
- val result = tEnv.sql(sqlQuery).toDataSet[Row].collect()
-
- val expected =
- "6,null,18,1\n5,null,13,1\n4,null,8,1\n3,null,5,1\n2,null,2,1\n1,null,1,1\n" +
- "null,Luke Skywalker,6,2\nnull,I am fine.,5,2\nnull,Hi,1,2\n" +
- "null,Hello world, how are you?,4,2\nnull,Hello world,3,2\nnull,Hello,2,2\n" +
- "null,Comment#9,15,2\nnull,Comment#8,14,2\nnull,Comment#7,13,2\n" +
- "null,Comment#6,12,2\nnull,Comment#5,11,2\nnull,Comment#4,10,2\n" +
- "null,Comment#3,9,2\nnull,Comment#2,8,2\nnull,Comment#15,21,2\n" +
- "null,Comment#14,20,2\nnull,Comment#13,19,2\nnull,Comment#12,18,2\n" +
- "null,Comment#11,17,2\nnull,Comment#10,16,2\nnull,Comment#1,7,2"
-
- TestBaseUtils.compareResultAsText(result.asJava, expected)
- }
-
- @Test
- def testAggregateEmptyDataSets(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT avg(a), sum(a), count(b) " +
- "FROM MyTable where a = 4 group by a"
-
- val sqlQuery2 = "SELECT avg(a), sum(a), count(b) " +
- "FROM MyTable where a = 4"
-
- val sqlQuery3 = "SELECT avg(a), sum(a), count(b) " +
- "FROM MyTable"
-
- val ds = env.fromElements(
- (1: Byte, 1: Short),
- (2: Byte, 2: Short))
- .toTable(tEnv, 'a, 'b)
-
- tEnv.registerTable("MyTable", ds)
-
- val result = tEnv.sql(sqlQuery)
- val result2 = tEnv.sql(sqlQuery2)
- val result3 = tEnv.sql(sqlQuery3)
-
- val results = result.toDataSet[Row].collect()
- val expected = Seq.empty
- val results2 = result2.toDataSet[Row].collect()
- val expected2 = "null,null,0"
- val results3 = result3.toDataSet[Row].collect()
- val expected3 = "1,3,2"
-
- assert(results.equals(expected),
- "Empty result is expected for grouped set, but actual: " + results)
- TestBaseUtils.compareResultAsText(results2.asJava, expected2)
- TestBaseUtils.compareResultAsText(results3.asJava, expected3)
- }
-
- @Test
- def testTumbleWindowAggregate(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
- tEnv.registerFunction("countFun", new CountAggFunction)
- tEnv.registerFunction("wAvgWithMergeAndReset", new WeightedAvgWithMergeAndReset)
-
- val sqlQuery =
- "SELECT b, SUM(a), countFun(c), wAvgWithMergeAndReset(b, a), wAvgWithMergeAndReset(a, a)" +
- "FROM T " +
- "GROUP BY b, TUMBLE(ts, INTERVAL '3' SECOND)"
-
- val ds = CollectionDataSets.get3TupleDataSet(env)
- // create timestamps
- .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000)))
- tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
-
- val result = tEnv.sql(sqlQuery).toDataSet[Row].collect()
- val expected = Seq(
- "1,1,1,1,1",
- "2,2,1,2,2", "2,3,1,2,3",
- "3,9,2,3,4", "3,6,1,3,6",
- "4,15,2,4,7", "4,19,2,4,9",
- "5,11,1,5,11", "5,39,3,5,13", "5,15,1,5,15",
- "6,33,2,6,16", "6,57,3,6,19", "6,21,1,6,21"
- ).mkString("\n")
-
- TestBaseUtils.compareResultAsText(result.asJava, expected)
- }
-
- @Test
- def testHopWindowAggregate(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
- tEnv.registerFunction("countFun", new CountAggFunction)
- tEnv.registerFunction("wAvgWithMergeAndReset", new WeightedAvgWithMergeAndReset)
-
- env.setParallelism(1)
-
- val sqlQuery =
- "SELECT b, SUM(a), countFun(c), wAvgWithMergeAndReset(b, a), wAvgWithMergeAndReset(a, a)" +
- "FROM T " +
- "GROUP BY b, HOP(ts, INTERVAL '2' SECOND, INTERVAL '4' SECOND)"
-
- val ds = CollectionDataSets.get3TupleDataSet(env)
- // create timestamps
- .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000)))
- tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
-
- val result = tEnv.sql(sqlQuery).toDataSet[Row].collect()
- val expected = Seq(
- "1,1,1,1,1","1,1,1,1,1",
- "2,5,2,2,2","2,5,2,2,2",
- "3,9,2,3,4", "3,15,3,3,5", "3,6,1,3,6",
- "4,7,1,4,7", "4,24,3,4,8", "4,27,3,4,9", "4,10,1,4,10",
- "5,11,1,5,11", "5,36,3,5,12", "5,54,4,5,13", "5,29,2,5,14",
- "6,33,2,6,16", "6,70,4,6,17", "6,78,4,6,19", "6,41,2,6,20"
- ).mkString("\n")
-
- TestBaseUtils.compareResultAsText(result.asJava, expected)
- }
-
- @Test
- def testSessionWindowAggregate(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
- tEnv.registerFunction("countFun", new CountAggFunction)
- tEnv.registerFunction("wAvgWithMergeAndReset", new WeightedAvgWithMergeAndReset)
-
- env.setParallelism(1)
-
- val sqlQuery =
- "SELECT MIN(a), MAX(a), SUM(a), countFun(c), wAvgWithMergeAndReset(b, a), " +
- "wAvgWithMergeAndReset(a, a)" +
- "FROM T " +
- "GROUP BY SESSION(ts, INTERVAL '4' SECOND)"
-
- val ds = CollectionDataSets.get3TupleDataSet(env)
- // create timestamps
- .filter(x => (x._2 % 2) == 0)
- .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000)))
- tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
-
- val result = tEnv.sql(sqlQuery).toDataSet[Row].collect()
- val expected = Seq(
- "2,10,39,6,3,7",
- "16,21,111,6,6,18"
- ).mkString("\n")
-
- TestBaseUtils.compareResultAsText(result.asJava, expected)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/sql/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/sql/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/sql/CalcITCase.scala
deleted file mode 100644
index 926dc43..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/sql/CalcITCase.scala
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.dataset.sql
-
-import java.sql.{Date, Time, Timestamp}
-import java.util
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.api.scala.batch.utils.{TableProgramsCollectionTestBase, TableProgramsTestBase}
-import org.apache.flink.table.api.{TableEnvironment, ValidationException}
-import org.apache.flink.table.functions.ScalarFunction
-import org.apache.flink.table.runtime.dataset.table.OldHashCode
-import org.apache.flink.test.util.TestBaseUtils
-import org.apache.flink.types.Row
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class CalcITCase(
- configMode: TableConfigMode)
- extends TableProgramsCollectionTestBase(configMode) {
-
- @Test
- def testSelectStarFromTable(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT * FROM MyTable"
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("MyTable", ds)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
- "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
- "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
- "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
- "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
- "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testSelectStarFromDataSet(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT * FROM MyTable"
-
- val ds = CollectionDataSets.get3TupleDataSet(env)
- tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
- "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
- "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
- "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
- "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
- "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testSimpleSelectAll(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT a, b, c FROM MyTable"
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("MyTable", ds)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
- "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
- "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
- "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
- "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
- "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testSelectWithNaming(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT _1 as a, _2 as b FROM MyTable"
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
- tEnv.registerTable("MyTable", ds)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
- "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
- "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
-
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test(expected = classOf[ValidationException])
- def testInvalidFields(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT a, foo FROM MyTable"
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("MyTable", ds)
-
- tEnv.sql(sqlQuery)
- }
-
- @Test
- def testAllRejectingFilter(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT * FROM MyTable WHERE false"
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
- tEnv.registerTable("MyTable", ds)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "\n"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testAllPassingFilter(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT * FROM MyTable WHERE true"
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
- tEnv.registerTable("MyTable", ds)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
- "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
- "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
- "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
- "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
- "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testFilterOnString(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT * FROM MyTable WHERE c LIKE '%world%'"
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("MyTable", ds)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testFilterOnInteger(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)=0"
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("MyTable", ds)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" +
- "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" +
- "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
- "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testDisjunctivePredicate(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT * FROM MyTable WHERE a < 2 OR a > 20"
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("MyTable", ds)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "1,1,Hi\n" + "21,6,Comment#15\n"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testFilterWithAnd(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)<>0 AND MOD(b,2)=0"
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("MyTable", ds)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" +
- "9,4,Comment#3\n" + "17,6,Comment#11\n" +
- "19,6,Comment#13\n" + "21,6,Comment#15\n"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testAdvancedDataTypes(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT a, b, c, DATE '1984-07-12', TIME '14:34:24', " +
- "TIMESTAMP '1984-07-12 14:34:24' FROM MyTable"
-
- val ds = env.fromElements((
- Date.valueOf("1984-07-12"),
- Time.valueOf("14:34:24"),
- Timestamp.valueOf("1984-07-12 14:34:24")))
- tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "1984-07-12,14:34:24,1984-07-12 14:34:24.0," +
- "1984-07-12,14:34:24,1984-07-12 14:34:24.0"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testUserDefinedScalarFunction(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- tEnv.registerFunction("hashCode", OldHashCode)
- tEnv.registerFunction("hashCode", MyHashCode)
-
- val ds = env.fromElements("a", "b", "c")
- tEnv.registerDataSet("MyTable", ds, 'text)
-
- val result = tEnv.sql("SELECT hashCode(text) FROM MyTable")
-
- val expected = "97\n98\n99"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-}
-
-object MyHashCode extends ScalarFunction {
- def eval(s: String): Int = s.hashCode()
-}
-
-object CalcITCase {
-
- @Parameterized.Parameters(name = "Table config = {0}")
- def parameters(): util.Collection[Array[java.lang.Object]] = {
- Seq[Array[AnyRef]](
- Array(TableProgramsTestBase.DEFAULT),
- Array(TableProgramsTestBase.NO_NULL)).asJava
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/sql/JoinITCase.scala
deleted file mode 100644
index aad4b3b..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/sql/JoinITCase.scala
+++ /dev/null
@@ -1,521 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.dataset.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.apache.flink.types.Row
-import org.junit.Assert.assertEquals
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class JoinITCase(
- configMode: TableConfigMode)
- extends TableProgramsCollectionTestBase(configMode) {
-
- @Test
- def testJoin(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
- val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e"
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
- tEnv.registerTable("Table3", ds1)
- tEnv.registerTable("Table5", ds2)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testJoinWithFilter(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND b < 2"
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
- tEnv.registerTable("Table3", ds1)
- tEnv.registerTable("Table5", ds2)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "Hi,Hallo\n"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testInnerJoinWithNonEquiJoinPredicate(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 AND h < b"
-
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
- tEnv.registerTable("Table3", ds1)
- tEnv.registerTable("Table5", ds2)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "Hello world, how are you?,Hallo Welt wie\n" + "I am fine.,Hallo Welt wie\n"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testJoinWithMultipleKeys(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = d AND b = h"
-
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
- tEnv.registerTable("Table3", ds1)
- tEnv.registerTable("Table5", ds2)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
- "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testJoinWithAlias(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT Table5.c, Table3.c FROM Table3, Table5 WHERE a = d AND a < 4"
-
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'c)
- tEnv.registerTable("Table3", ds1)
- tEnv.registerTable("Table5", ds2)
-
- val result = tEnv.sql(sqlQuery)
- val expected = "1,Hi\n" + "2,Hello\n" + "1,Hello\n" +
- "2,Hello world\n" + "2,Hello world\n" + "3,Hello world\n"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testDataSetJoinWithAggregation(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT COUNT(g), COUNT(b) FROM Table3, Table5 WHERE a = d"
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
- val ds2 = CollectionDataSets.get5TupleDataSet(env)
- tEnv.registerDataSet("Table3", ds1, 'a, 'b, 'c)
- tEnv.registerDataSet("Table5", ds2, 'd, 'e, 'f, 'g, 'h)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "6,6"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testTableJoinWithAggregation(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT COUNT(b), COUNT(g) FROM Table3, Table5 WHERE a = d"
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
- tEnv.registerTable("Table3", ds1)
- tEnv.registerTable("Table5", ds2)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "6,6"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testFullOuterJoin(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
- tEnv.getConfig.setNullCheck(true)
-
- val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e"
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
- tEnv.registerTable("Table3", ds1)
- tEnv.registerTable("Table5", ds2)
-
- val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n" +
- "null,Hallo Welt wie\n" + "null,Hallo Welt wie gehts?\n" + "null,ABC\n" + "null,BCD\n" +
- "null,CDE\n" + "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "null,HIJ\n" +
- "null,IJK\n" + "null,JKL\n" + "null,KLM"
-
- val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testLeftOuterJoin(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
- tEnv.getConfig.setNullCheck(true)
-
- val sqlQuery = "SELECT c, g FROM Table5 LEFT OUTER JOIN Table3 ON b = e"
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
- tEnv.registerTable("Table3", ds1)
- tEnv.registerTable("Table5", ds2)
-
- val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n" +
- "null,Hallo Welt wie\n" + "null,Hallo Welt wie gehts?\n" + "null,ABC\n" + "null,BCD\n" +
- "null,CDE\n" + "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "null,HIJ\n" +
- "null,IJK\n" + "null,JKL\n" + "null,KLM"
- val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testRightOuterJoin(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
- tEnv.getConfig.setNullCheck(true)
-
- val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e"
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
- tEnv.registerTable("Table3", ds1)
- tEnv.registerTable("Table5", ds2)
-
- val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n" +
- "null,Hallo Welt wie\n" + "null,Hallo Welt wie gehts?\n" + "null,ABC\n" + "null,BCD\n" +
- "null,CDE\n" + "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "null,HIJ\n" +
- "null,IJK\n" + "null,JKL\n" + "null,KLM"
- val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testCrossJoinWithLeftSingleRowInput(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val table = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a1, 'a2, 'a3)
- tEnv.registerTable("A", table)
-
- val sqlQuery2 = "SELECT * FROM (SELECT count(*) FROM A) CROSS JOIN A"
- val expected =
- "3,1,1,Hi\n" +
- "3,2,2,Hello\n" +
- "3,3,2,Hello world"
- val result = tEnv.sql(sqlQuery2).collect()
- TestBaseUtils.compareResultAsText(result.asJava, expected)
- }
-
- @Test
- def testCrossJoinWithRightSingleRowInput(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val table = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a1, 'a2, 'a3)
- tEnv.registerTable("A", table)
-
- val sqlQuery1 = "SELECT * FROM A CROSS JOIN (SELECT count(*) FROM A)"
- val expected =
- "1,1,Hi,3\n" +
- "2,2,Hello,3\n" +
- "3,2,Hello world,3"
- val result = tEnv.sql(sqlQuery1).collect()
- TestBaseUtils.compareResultAsText(result.asJava, expected)
- }
-
- @Test
- def testCrossJoinWithEmptySingleRowInput(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val table = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a1, 'a2, 'a3)
- tEnv.registerTable("A", table)
-
- val sqlQuery1 = "SELECT * FROM A CROSS JOIN (SELECT count(*) FROM A HAVING count(*) < 0)"
- val result = tEnv.sql(sqlQuery1).count()
- Assert.assertEquals(0, result)
- }
-
- @Test
- def testLeftNullRightJoin(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
- val sqlQuery =
- "SELECT a, cnt " +
- "FROM (SELECT cnt FROM (SELECT COUNT(*) AS cnt FROM B) WHERE cnt < 0) RIGHT JOIN A ON a < cnt"
-
- val ds1 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
- val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv)
- tEnv.registerTable("A", ds1)
- tEnv.registerTable("B", ds2)
-
-
- val result = tEnv.sql(sqlQuery)
- val expected = Seq(
- "1,null",
- "2,null", "2,null",
- "3,null", "3,null", "3,null",
- "4,null", "4,null", "4,null", "4,null",
- "5,null", "5,null", "5,null", "5,null", "5,null").mkString("\n")
-
- val results = result.toDataSet[Row].collect()
-
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
-
- @Test
- def testLeftSingleRightJoinEqualPredicate(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
- val sqlQuery =
- "SELECT a, cnt FROM (SELECT COUNT(*) AS cnt FROM B) RIGHT JOIN A ON cnt = a"
-
- val ds1 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
- val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv)
- tEnv.registerTable("A", ds1)
- tEnv.registerTable("B", ds2)
-
- val result = tEnv.sql(sqlQuery)
- val expected = Seq(
- "1,null", "2,null", "2,null", "3,3", "3,3",
- "3,3", "4,null", "4,null", "4,null",
- "4,null", "5,null", "5,null", "5,null",
- "5,null", "5,null").mkString("\n")
-
- val results = result.toDataSet[Row].collect()
-
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testLeftSingleRightJoinNotEqualPredicate(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
- val sqlQuery =
- "SELECT a, cnt FROM (SELECT COUNT(*) AS cnt FROM B) RIGHT JOIN A ON cnt > a"
-
- val ds1 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
- val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv)
- tEnv.registerTable("A", ds1)
- tEnv.registerTable("B", ds2)
-
- val result = tEnv.sql(sqlQuery)
- val expected = Seq(
- "1,3", "2,3", "2,3", "3,null", "3,null",
- "3,null", "4,null", "4,null", "4,null",
- "4,null", "5,null", "5,null", "5,null",
- "5,null", "5,null").mkString("\n")
-
- val results = result.toDataSet[Row].collect()
-
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testRightNullLeftJoin(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
- val sqlQuery =
- "SELECT a, cnt " +
- "FROM A LEFT JOIN (SELECT cnt FROM (SELECT COUNT(*) AS cnt FROM B) WHERE cnt < 0) ON cnt > a"
-
- val ds1 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv)
- val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("A", ds2)
- tEnv.registerTable("B", ds1)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = Seq(
- "2,null", "3,null", "1,null").mkString("\n")
-
- val results = result.toDataSet[Row].collect()
-
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testRightSingleLeftJoinEqualPredicate(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
- val sqlQuery =
- "SELECT a, cnt FROM A LEFT JOIN (SELECT COUNT(*) AS cnt FROM B) ON cnt = a"
-
- val ds1 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
- val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv)
- tEnv.registerTable("A", ds1)
- tEnv.registerTable("B", ds2)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = Seq(
- "1,null", "2,null", "2,null", "3,3", "3,3",
- "3,3", "4,null", "4,null", "4,null",
- "4,null", "5,null", "5,null", "5,null",
- "5,null", "5,null").mkString("\n")
-
- val results = result.toDataSet[Row].collect()
-
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testRightSingleLeftJoinNotEqualPredicate(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
- val sqlQuery =
- "SELECT a, cnt FROM A LEFT JOIN (SELECT COUNT(*) AS cnt FROM B) ON cnt < a"
-
- val ds1 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
- val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv)
- tEnv.registerTable("A", ds1)
- tEnv.registerTable("B", ds2)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = Seq(
- "1,null", "2,null", "2,null", "3,null", "3,null",
- "3,null", "4,3", "4,3", "4,3",
- "4,3", "5,3", "5,3", "5,3",
- "5,3", "5,3").mkString("\n")
-
- val results = result.toDataSet[Row].collect()
-
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testRightSingleLeftJoinTwoFields(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
- val sqlQuery =
- "SELECT a, cnt, cnt2 " +
- "FROM t1 LEFT JOIN (SELECT COUNT(*) AS cnt,COUNT(*) AS cnt2 FROM t2 ) AS x ON a = cnt"
-
- val ds1 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
- val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv)
- tEnv.registerTable("t1", ds1)
- tEnv.registerTable("t2", ds2)
-
- val result = tEnv.sql(sqlQuery)
- val expected = Seq(
- "1,null,null",
- "2,null,null", "2,null,null",
- "3,3,3", "3,3,3", "3,3,3",
- "4,null,null", "4,null,null", "4,null,null", "4,null,null",
- "5,null,null", "5,null,null", "5,null,null", "5,null,null", "5,null,null").mkString("\n")
-
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testCrossWithUnnest(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val data = List(
- (1, 1L, Array("Hi", "w")),
- (2, 2L, Array("Hello", "k")),
- (3, 2L, Array("Hello world", "x"))
- )
- val stream = env.fromCollection(data)
- tEnv.registerDataSet("T", stream, 'a, 'b, 'c)
-
- val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) as A (s)"
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = List("1,Hi", "1,w", "2,Hello", "2,k", "3,Hello world", "3,x")
- val results = result.toDataSet[Row].collect().toList
- assertEquals(expected.toString(), results.sortWith(_.toString < _.toString).toString())
- }
-
- @Test
- def testJoinWithUnnestOfTuple(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val data = List(
- (1, Array((12, "45.6"), (2, "45.612"))),
- (2, Array((13, "41.6"), (1, "45.2136"))),
- (3, Array((18, "42.6")))
- )
- val stream = env.fromCollection(data)
- tEnv.registerDataSet("T", stream, 'a, 'b)
-
- val sqlQuery = "" +
- "SELECT a, b, x, y " +
- "FROM " +
- " (SELECT a, b FROM T WHERE a < 3) as tf, " +
- " UNNEST(tf.b) as A (x, y) " +
- "WHERE x > a"
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = List(
- "1,[(12,45.6), (2,45.612)],12,45.6",
- "1,[(12,45.6), (2,45.612)],2,45.612",
- "2,[(13,41.6), (1,45.2136)],13,41.6").mkString(", ")
- val results = result.toDataSet[Row].collect().map(_.toString)
- assertEquals(expected, results.sorted.mkString(", "))
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/sql/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/sql/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/sql/SetOperatorsITCase.scala
deleted file mode 100644
index 631ce2b..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/sql/SetOperatorsITCase.scala
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.dataset.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.apache.flink.types.Row
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.util.Random
-
-@RunWith(classOf[Parameterized])
-class SetOperatorsITCase(
- configMode: TableConfigMode)
- extends TableProgramsCollectionTestBase(configMode) {
-
- @Test
- def testUnionAll(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT c FROM t1 UNION ALL (SELECT f FROM t2)"
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
- val ds2 = CollectionDataSets.getSmall3TupleDataSet(env)
- tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
- tEnv.registerDataSet("t2", ds2, 'd, 'e, 'f)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testUnion(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT c FROM t1 UNION (SELECT f FROM t2)"
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
- val ds2 = CollectionDataSets.getSmall3TupleDataSet(env)
- tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
- tEnv.registerDataSet("t2", ds2, 'd, 'e, 'f)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "Hi\n" + "Hello\n" + "Hello world\n"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testUnionWithFilter(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT c FROM (" +
- "SELECT * FROM t1 UNION ALL (SELECT a, b, c FROM t2))" +
- "WHERE b < 2"
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
- val ds2 = CollectionDataSets.get5TupleDataSet(env)
- tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
- tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "Hi\n" + "Hallo\n"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testUnionWithAggregation(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT count(c) FROM (" +
- "SELECT * FROM t1 UNION ALL (SELECT a, b, c FROM t2))"
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
- val ds2 = CollectionDataSets.get5TupleDataSet(env)
- tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
- tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "18"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testExcept(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT c FROM t1 EXCEPT (SELECT c FROM t2)"
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
- val ds2 = env.fromElements((1, 1L, "Hi"))
- tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
- tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "Hello\n" + "Hello world\n"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- @Ignore
- // calcite sql parser doesn't support EXCEPT ALL
- def testExceptAll(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT c FROM t1 EXCEPT ALL SELECT c FROM t2"
-
- val data1 = new mutable.MutableList[Int]
- data1 += (1, 1, 1, 2, 2)
- val data2 = new mutable.MutableList[Int]
- data2 += (1, 2, 2, 3)
- val ds1 = env.fromCollection(data1)
- val ds2 = env.fromCollection(data2)
-
- tEnv.registerDataSet("t1", ds1, 'c)
- tEnv.registerDataSet("t2", ds2, 'c)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "1\n1"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testExceptWithFilter(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT c FROM (" +
- "SELECT * FROM t1 EXCEPT (SELECT a, b, c FROM t2))" +
- "WHERE b < 2"
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
- val ds2 = CollectionDataSets.get5TupleDataSet(env)
- tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
- tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "Hi\n"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testIntersect(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT c FROM t1 INTERSECT SELECT c FROM t2"
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
- val data = new mutable.MutableList[(Int, Long, String)]
- data.+=((1, 1L, "Hi"))
- data.+=((2, 2L, "Hello"))
- data.+=((2, 2L, "Hello"))
- data.+=((3, 2L, "Hello world!"))
- val ds2 = env.fromCollection(Random.shuffle(data))
-
- tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
- tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "Hi\n" + "Hello\n"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- @Ignore
- // calcite sql parser doesn't support INTERSECT ALL
- def testIntersectAll(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT c FROM t1 INTERSECT ALL SELECT c FROM t2"
-
- val data1 = new mutable.MutableList[Int]
- data1 += (1, 1, 1, 2, 2)
- val data2 = new mutable.MutableList[Int]
- data2 += (1, 2, 2, 3)
- val ds1 = env.fromCollection(data1)
- val ds2 = env.fromCollection(data2)
-
- tEnv.registerDataSet("t1", ds1, 'c)
- tEnv.registerDataSet("t2", ds2, 'c)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "1\n2\n2"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testIntersectWithFilter(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT c FROM ((SELECT * FROM t1) INTERSECT (SELECT * FROM t2)) WHERE a > 1"
-
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
- val ds2 = CollectionDataSets.get3TupleDataSet(env)
-
- tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
- tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "Hello\n" + "Hello world\n"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/sql/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/sql/SortITCase.scala
deleted file mode 100644
index f06517a..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/sql/SortITCase.scala
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.dataset.sql
-
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.scala.{ExecutionEnvironment, _}
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.SortTestUtils._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.apache.flink.types.Row
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-@RunWith(classOf[Parameterized])
-class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode)
- extends TableProgramsClusterTestBase(mode, configMode) {
-
- private def getExecutionEnvironment = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- // set the parallelism explicitly to make sure the query is executed in
- // a distributed manner
- env.setParallelism(3)
- env
- }
-
- @Test
- def testOrderByMultipleFieldsWithSql(): Unit = {
- val env = getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 DESC, _2 DESC"
-
- implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
- (- x.productElement(0).asInstanceOf[Int], - x.productElement(1).asInstanceOf[Long]))
-
- val ds = CollectionDataSets.get3TupleDataSet(env)
- tEnv.registerDataSet("MyTable", ds)
-
- val expected = sortExpectedly(tupleDataSetStrings)
- // squash all rows inside a partition into one element
- val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
- // the rows need to be copied in object reuse mode
- val copied = new mutable.ArrayBuffer[Row]
- rows.foreach(r => copied += Row.copy(r))
- Seq(copied)
- }).collect()
-
- def rowOrdering = Ordering.by((r : Row) => {
- // ordering for this tuple will fall into the previous defined tupleOrdering,
- // so we just need to return the field by their defining sequence
- (r.getField(0).asInstanceOf[Int], r.getField(1).asInstanceOf[Long])
- })
-
- val result = results
- .filterNot(_.isEmpty)
- // sort all partitions by their head element to verify the order across partitions
- .sortBy(_.head)(rowOrdering)
- .reduceLeft(_ ++ _)
-
- TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
- }
-
- @Test
- def testOrderByWithOffset(): Unit = {
- val env = getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 DESC OFFSET 2 ROWS"
-
- implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
- - x.productElement(0).asInstanceOf[Int] )
-
- val ds = CollectionDataSets.get3TupleDataSet(env)
- tEnv.registerDataSet("MyTable", ds)
-
- val expected = sortExpectedly(tupleDataSetStrings, 2, 21)
- // squash all rows inside a partition into one element
- val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
- // the rows need to be copied in object reuse mode
- val copied = new mutable.ArrayBuffer[Row]
- rows.foreach(r => copied += Row.copy(r))
- Seq(copied)
- }).collect()
-
- val result = results.
- filterNot(_.isEmpty)
- // sort all partitions by their head element to verify the order across partitions
- .sortBy(_.head)(Ordering.by((r : Row) => -r.getField(0).asInstanceOf[Int]))
- .reduceLeft(_ ++ _)
-
- TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
- }
-
- @Test
- def testOrderByWithOffsetAndFetch(): Unit = {
- val env = getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 OFFSET 2 ROWS FETCH NEXT 5 ROWS ONLY"
-
- implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
- x.productElement(0).asInstanceOf[Int] )
-
- val ds = CollectionDataSets.get3TupleDataSet(env)
- tEnv.registerDataSet("MyTable", ds)
-
- val expected = sortExpectedly(tupleDataSetStrings, 2, 7)
- // squash all rows inside a partition into one element
- val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
- // the rows need to be copied in object reuse mode
- val copied = new mutable.ArrayBuffer[Row]
- rows.foreach(r => copied += Row.copy(r))
- Seq(copied)
- }).collect()
-
- val result = results
- .filterNot(_.isEmpty)
- // sort all partitions by their head element to verify the order across partitions
- .sortBy(_.head)(Ordering.by((r : Row) => r.getField(0).asInstanceOf[Int]))
- .reduceLeft(_ ++ _)
-
- TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
- }
-
- @Test
- def testOrderByLimit(): Unit = {
- val env = getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT * FROM MyTable ORDER BY _2, _1 LIMIT 5"
-
- implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
- (x.productElement(1).asInstanceOf[Long], x.productElement(0).asInstanceOf[Int]) )
-
- val ds = CollectionDataSets.get3TupleDataSet(env)
- tEnv.registerDataSet("MyTable", ds)
-
- val expected = sortExpectedly(tupleDataSetStrings, 0, 5)
- // squash all rows inside a partition into one element
- val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
- // the rows need to be copied in object reuse mode
- val copied = new mutable.ArrayBuffer[Row]
- rows.foreach(r => copied += Row.copy(r))
- Seq(copied)
- }).collect()
-
- def rowOrdering = Ordering.by((r : Row) => {
- // ordering for this tuple will fall into the previous defined tupleOrdering,
- // so we just need to return the field by their defining sequence
- (r.getField(0).asInstanceOf[Int], r.getField(1).asInstanceOf[Long])
- })
-
- val result = results
- .filterNot(_.isEmpty)
- // sort all partitions by their head element to verify the order across partitions
- .sortBy(_.head)(rowOrdering)
- .reduceLeft(_ ++ _)
-
- TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/sql/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/sql/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/sql/TableSourceITCase.scala
deleted file mode 100644
index e7db602..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/sql/TableSourceITCase.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.dataset.sql
-
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.utils.CommonTestData
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit.Test
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class TableSourceITCase(
- configMode: TableConfigMode)
- extends TableProgramsCollectionTestBase(configMode) {
-
- @Test
- def testCsvTableSource(): Unit = {
-
- val csvTable = CommonTestData.getCsvTableSource
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- tEnv.registerTableSource("csvTable", csvTable)
- val results = tEnv.sql(
- "SELECT id, `first`, `last`, score FROM csvTable").collect()
-
- val expected = Seq(
- "1,Mike,Smith,12.3",
- "2,Bob,Taylor,45.6",
- "3,Sam,Miller,7.89",
- "4,Peter,Smith,0.12",
- "5,Liz,Williams,34.5",
- "6,Sally,Miller,6.78",
- "7,Alice,Smith,90.1",
- "8,Kelly,Williams,2.34").mkString("\n")
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testNested(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv = TableEnvironment.getTableEnvironment(env, config)
- val nestedTable = CommonTestData.getNestedTableSource
-
- tableEnv.registerTableSource("NestedPersons", nestedTable)
-
- val result = tableEnv.sql("SELECT NestedPersons.firstName, NestedPersons.lastName," +
- "NestedPersons.address.street, NestedPersons.address.city AS city " +
- "FROM NestedPersons " +
- "WHERE NestedPersons.address.city LIKE 'Dublin'").collect()
-
- val expected = "Bob,Taylor,Pearse Street,Dublin"
-
- TestBaseUtils.compareResultAsText(result.asJava, expected)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/sql/TableWithSQLITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/sql/TableWithSQLITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/sql/TableWithSQLITCase.scala
deleted file mode 100644
index b31c1fb..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/sql/TableWithSQLITCase.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.dataset.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.apache.flink.types.Row
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class TableWithSQLITCase(
- configMode: TableConfigMode)
- extends TableProgramsCollectionTestBase(configMode) {
-
- @Test
- def testSQLTable(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds = CollectionDataSets.get3TupleDataSet(env)
- tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
-
- val sqlQuery = "SELECT * FROM MyTable WHERE a > 9"
-
- val result = tEnv.sql(sqlQuery).select('a.avg, 'b.sum, 'c.count)
-
- val expected = "15,65,12"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testTableSQLTable(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val t1 = ds.filter('a > 9)
-
- tEnv.registerTable("MyTable", t1)
-
- val sqlQuery = "SELECT avg(a) as a1, sum(b) as b1, count(c) as c1 FROM MyTable"
-
- val result = tEnv.sql(sqlQuery).select('a1 + 1, 'b1 - 5, 'c1)
-
- val expected = "16,60,12"
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testMultipleSQLQueries(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- tEnv.registerTable("MyTable", t)
-
- val sqlQuery = "SELECT a as aa FROM MyTable WHERE b = 6"
- val result1 = tEnv.sql(sqlQuery)
- tEnv.registerTable("ResTable", result1)
-
- val sqlQuery2 = "SELECT count(aa) FROM ResTable"
- val result2 = tEnv.sql(sqlQuery2)
-
- val expected = "6"
- val results = result2.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testSelectWithCompositeType(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val sqlQuery = "SELECT MyTable.a2, MyTable.a1._2 FROM MyTable"
-
- val ds = env.fromElements(((12, true), "Hello")).toTable(tEnv).as('a1, 'a2)
- tEnv.registerTable("MyTable", ds)
-
- val result = tEnv.sql(sqlQuery)
-
- val expected = "Hello,true\n"
-
- val results = result.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-}