You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/07/13 10:18:36 UTC
[27/44] flink git commit: [FLINK-6617] [table] Restructuring of tests
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
new file mode 100644
index 0000000..711182c
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.sql
+
+import java.sql.{Date, Time, Timestamp}
+import java.util
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.apache.flink.table.functions.ScalarFunction
+import org.apache.flink.table.runtime.batch.table.OldHashCode
+import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.table.runtime.utils.{TableProgramsCollectionTestBase, TableProgramsTestBase}
+import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.types.Row
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class CalcITCase(
+ configMode: TableConfigMode)
+ extends TableProgramsCollectionTestBase(configMode) {
+
+ @Test
+ def testSelectStarFromTable(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+ "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+ "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testSelectStarFromDataSet(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+ "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+ "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testSimpleSelectAll(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT a, b, c FROM MyTable"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+ "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+ "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testSelectWithNaming(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT _1 as a, _2 as b FROM MyTable"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+ "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+ "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
+
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidFields(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT a, foo FROM MyTable"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", ds)
+
+ tEnv.sql(sqlQuery)
+ }
+
+ @Test
+ def testAllRejectingFilter(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable WHERE false"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testAllPassingFilter(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable WHERE true"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+ "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+ "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+ "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+ "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+ "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testFilterOnString(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable WHERE c LIKE '%world%'"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testFilterOnInteger(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)=0"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" +
+ "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" +
+ "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
+ "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testDisjunctivePredicate(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable WHERE a < 2 OR a > 20"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,1,Hi\n" + "21,6,Comment#15\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testFilterWithAnd(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)<>0 AND MOD(b,2)=0"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" +
+ "9,4,Comment#3\n" + "17,6,Comment#11\n" +
+ "19,6,Comment#13\n" + "21,6,Comment#15\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testAdvancedDataTypes(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT a, b, c, DATE '1984-07-12', TIME '14:34:24', " +
+ "TIMESTAMP '1984-07-12 14:34:24' FROM MyTable"
+
+ val ds = env.fromElements((
+ Date.valueOf("1984-07-12"),
+ Time.valueOf("14:34:24"),
+ Timestamp.valueOf("1984-07-12 14:34:24")))
+ tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1984-07-12,14:34:24,1984-07-12 14:34:24.0," +
+ "1984-07-12,14:34:24,1984-07-12 14:34:24.0"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testUserDefinedScalarFunction(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ tEnv.registerFunction("hashCode", OldHashCode)
+ tEnv.registerFunction("hashCode", MyHashCode)
+
+ val ds = env.fromElements("a", "b", "c")
+ tEnv.registerDataSet("MyTable", ds, 'text)
+
+ val result = tEnv.sql("SELECT hashCode(text) FROM MyTable")
+
+ val expected = "97\n98\n99"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+}
+
+object MyHashCode extends ScalarFunction {
+ def eval(s: String): Int = s.hashCode()
+}
+
+object CalcITCase {
+
+ @Parameterized.Parameters(name = "Table config = {0}")
+ def parameters(): util.Collection[Array[java.lang.Object]] = {
+ Seq[Array[AnyRef]](
+ Array(TableProgramsTestBase.DEFAULT),
+ Array(TableProgramsTestBase.NO_NULL)).asJava
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
new file mode 100644
index 0000000..681b4b5
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.TableProgramsCollectionTestBase
+import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class JoinITCase(
+ configMode: TableConfigMode)
+ extends TableProgramsCollectionTestBase(configMode) {
+
+ @Test
+ def testJoin(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testJoinWithFilter(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND b < 2"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "Hi,Hallo\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testInnerJoinWithNonEquiJoinPredicate(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 AND h < b"
+
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "Hello world, how are you?,Hallo Welt wie\n" + "I am fine.,Hallo Welt wie\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testJoinWithMultipleKeys(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = d AND b = h"
+
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
+ "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testJoinWithAlias(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT Table5.c, Table3.c FROM Table3, Table5 WHERE a = d AND a < 4"
+
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'c)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ val result = tEnv.sql(sqlQuery)
+ val expected = "1,Hi\n" + "2,Hello\n" + "1,Hello\n" +
+ "2,Hello world\n" + "2,Hello world\n" + "3,Hello world\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testDataSetJoinWithAggregation(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT COUNT(g), COUNT(b) FROM Table3, Table5 WHERE a = d"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env)
+ tEnv.registerDataSet("Table3", ds1, 'a, 'b, 'c)
+ tEnv.registerDataSet("Table5", ds2, 'd, 'e, 'f, 'g, 'h)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "6,6"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testTableJoinWithAggregation(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT COUNT(b), COUNT(g) FROM Table3, Table5 WHERE a = d"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "6,6"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testFullOuterJoin(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ tEnv.getConfig.setNullCheck(true)
+
+ val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n" +
+ "null,Hallo Welt wie\n" + "null,Hallo Welt wie gehts?\n" + "null,ABC\n" + "null,BCD\n" +
+ "null,CDE\n" + "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "null,HIJ\n" +
+ "null,IJK\n" + "null,JKL\n" + "null,KLM"
+
+ val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testLeftOuterJoin(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ tEnv.getConfig.setNullCheck(true)
+
+ val sqlQuery = "SELECT c, g FROM Table5 LEFT OUTER JOIN Table3 ON b = e"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n" +
+ "null,Hallo Welt wie\n" + "null,Hallo Welt wie gehts?\n" + "null,ABC\n" + "null,BCD\n" +
+ "null,CDE\n" + "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "null,HIJ\n" +
+ "null,IJK\n" + "null,JKL\n" + "null,KLM"
+ val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testRightOuterJoin(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ tEnv.getConfig.setNullCheck(true)
+
+ val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n" +
+ "null,Hallo Welt wie\n" + "null,Hallo Welt wie gehts?\n" + "null,ABC\n" + "null,BCD\n" +
+ "null,CDE\n" + "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "null,HIJ\n" +
+ "null,IJK\n" + "null,JKL\n" + "null,KLM"
+ val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testCrossJoinWithLeftSingleRowInput(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val table = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a1, 'a2, 'a3)
+ tEnv.registerTable("A", table)
+
+ val sqlQuery2 = "SELECT * FROM (SELECT count(*) FROM A) CROSS JOIN A"
+ val expected =
+ "3,1,1,Hi\n" +
+ "3,2,2,Hello\n" +
+ "3,3,2,Hello world"
+ val result = tEnv.sql(sqlQuery2).collect()
+ TestBaseUtils.compareResultAsText(result.asJava, expected)
+ }
+
+ @Test
+ def testCrossJoinWithRightSingleRowInput(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val table = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a1, 'a2, 'a3)
+ tEnv.registerTable("A", table)
+
+ val sqlQuery1 = "SELECT * FROM A CROSS JOIN (SELECT count(*) FROM A)"
+ val expected =
+ "1,1,Hi,3\n" +
+ "2,2,Hello,3\n" +
+ "3,2,Hello world,3"
+ val result = tEnv.sql(sqlQuery1).collect()
+ TestBaseUtils.compareResultAsText(result.asJava, expected)
+ }
+
+ @Test
+ def testCrossJoinWithEmptySingleRowInput(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val table = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a1, 'a2, 'a3)
+ tEnv.registerTable("A", table)
+
+ val sqlQuery1 = "SELECT * FROM A CROSS JOIN (SELECT count(*) FROM A HAVING count(*) < 0)"
+ val result = tEnv.sql(sqlQuery1).count()
+ Assert.assertEquals(0, result)
+ }
+
+ @Test
+ def testLeftNullRightJoin(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ val sqlQuery =
+ "SELECT a, cnt " +
+ "FROM (SELECT cnt FROM (SELECT COUNT(*) AS cnt FROM B) WHERE cnt < 0) RIGHT JOIN A ON a < cnt"
+
+ val ds1 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+ val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv)
+ tEnv.registerTable("A", ds1)
+ tEnv.registerTable("B", ds2)
+
+
+ val result = tEnv.sql(sqlQuery)
+ val expected = Seq(
+ "1,null",
+ "2,null", "2,null",
+ "3,null", "3,null", "3,null",
+ "4,null", "4,null", "4,null", "4,null",
+ "5,null", "5,null", "5,null", "5,null", "5,null").mkString("\n")
+
+ val results = result.toDataSet[Row].collect()
+
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+
+ @Test
+ def testLeftSingleRightJoinEqualPredicate(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ val sqlQuery =
+ "SELECT a, cnt FROM (SELECT COUNT(*) AS cnt FROM B) RIGHT JOIN A ON cnt = a"
+
+ val ds1 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+ val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv)
+ tEnv.registerTable("A", ds1)
+ tEnv.registerTable("B", ds2)
+
+ val result = tEnv.sql(sqlQuery)
+ val expected = Seq(
+ "1,null", "2,null", "2,null", "3,3", "3,3",
+ "3,3", "4,null", "4,null", "4,null",
+ "4,null", "5,null", "5,null", "5,null",
+ "5,null", "5,null").mkString("\n")
+
+ val results = result.toDataSet[Row].collect()
+
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testLeftSingleRightJoinNotEqualPredicate(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ val sqlQuery =
+ "SELECT a, cnt FROM (SELECT COUNT(*) AS cnt FROM B) RIGHT JOIN A ON cnt > a"
+
+ val ds1 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+ val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv)
+ tEnv.registerTable("A", ds1)
+ tEnv.registerTable("B", ds2)
+
+ val result = tEnv.sql(sqlQuery)
+ val expected = Seq(
+ "1,3", "2,3", "2,3", "3,null", "3,null",
+ "3,null", "4,null", "4,null", "4,null",
+ "4,null", "5,null", "5,null", "5,null",
+ "5,null", "5,null").mkString("\n")
+
+ val results = result.toDataSet[Row].collect()
+
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testRightNullLeftJoin(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ val sqlQuery =
+ "SELECT a, cnt " +
+ "FROM A LEFT JOIN (SELECT cnt FROM (SELECT COUNT(*) AS cnt FROM B) WHERE cnt < 0) ON cnt > a"
+
+ val ds1 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv)
+ val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("A", ds2)
+ tEnv.registerTable("B", ds1)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = Seq(
+ "2,null", "3,null", "1,null").mkString("\n")
+
+ val results = result.toDataSet[Row].collect()
+
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testRightSingleLeftJoinEqualPredicate(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ val sqlQuery =
+ "SELECT a, cnt FROM A LEFT JOIN (SELECT COUNT(*) AS cnt FROM B) ON cnt = a"
+
+ val ds1 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+ val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv)
+ tEnv.registerTable("A", ds1)
+ tEnv.registerTable("B", ds2)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = Seq(
+ "1,null", "2,null", "2,null", "3,3", "3,3",
+ "3,3", "4,null", "4,null", "4,null",
+ "4,null", "5,null", "5,null", "5,null",
+ "5,null", "5,null").mkString("\n")
+
+ val results = result.toDataSet[Row].collect()
+
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testRightSingleLeftJoinNotEqualPredicate(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ val sqlQuery =
+ "SELECT a, cnt FROM A LEFT JOIN (SELECT COUNT(*) AS cnt FROM B) ON cnt < a"
+
+ val ds1 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+ val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv)
+ tEnv.registerTable("A", ds1)
+ tEnv.registerTable("B", ds2)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = Seq(
+ "1,null", "2,null", "2,null", "3,null", "3,null",
+ "3,null", "4,3", "4,3", "4,3",
+ "4,3", "5,3", "5,3", "5,3",
+ "5,3", "5,3").mkString("\n")
+
+ val results = result.toDataSet[Row].collect()
+
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testRightSingleLeftJoinTwoFields(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ val sqlQuery =
+ "SELECT a, cnt, cnt2 " +
+ "FROM t1 LEFT JOIN (SELECT COUNT(*) AS cnt,COUNT(*) AS cnt2 FROM t2 ) AS x ON a = cnt"
+
+ val ds1 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+ val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv)
+ tEnv.registerTable("t1", ds1)
+ tEnv.registerTable("t2", ds2)
+
+ val result = tEnv.sql(sqlQuery)
+ val expected = Seq(
+ "1,null,null",
+ "2,null,null", "2,null,null",
+ "3,3,3", "3,3,3", "3,3,3",
+ "4,null,null", "4,null,null", "4,null,null", "4,null,null",
+ "5,null,null", "5,null,null", "5,null,null", "5,null,null", "5,null,null").mkString("\n")
+
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testCrossWithUnnest(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val data = List(
+ (1, 1L, Array("Hi", "w")),
+ (2, 2L, Array("Hello", "k")),
+ (3, 2L, Array("Hello world", "x"))
+ )
+ val stream = env.fromCollection(data)
+ tEnv.registerDataSet("T", stream, 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) as A (s)"
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = List("1,Hi", "1,w", "2,Hello", "2,k", "3,Hello world", "3,x")
+ val results = result.toDataSet[Row].collect().toList
+ assertEquals(expected.toString(), results.sortWith(_.toString < _.toString).toString())
+ }
+
+ @Test
+ def testJoinWithUnnestOfTuple(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val data = List(
+ (1, Array((12, "45.6"), (2, "45.612"))),
+ (2, Array((13, "41.6"), (1, "45.2136"))),
+ (3, Array((18, "42.6")))
+ )
+ val stream = env.fromCollection(data)
+ tEnv.registerDataSet("T", stream, 'a, 'b)
+
+ val sqlQuery = "" +
+ "SELECT a, b, x, y " +
+ "FROM " +
+ " (SELECT a, b FROM T WHERE a < 3) as tf, " +
+ " UNNEST(tf.b) as A (x, y) " +
+ "WHERE x > a"
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = List(
+ "1,[(12,45.6), (2,45.612)],12,45.6",
+ "1,[(12,45.6), (2,45.612)],2,45.612",
+ "2,[(13,41.6), (1,45.2136)],13,41.6").mkString(", ")
+ val results = result.toDataSet[Row].collect().map(_.toString)
+ assertEquals(expected, results.sorted.mkString(", "))
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
new file mode 100644
index 0000000..8a59feb
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.TableProgramsCollectionTestBase
+import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.types.Row
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.Random
+
+@RunWith(classOf[Parameterized])
+class SetOperatorsITCase(
+ configMode: TableConfigMode)
+ extends TableProgramsCollectionTestBase(configMode) {
+
+ @Test
+ def testUnionAll(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT c FROM t1 UNION ALL (SELECT f FROM t2)"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+ val ds2 = CollectionDataSets.getSmall3TupleDataSet(env)
+ tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+ tEnv.registerDataSet("t2", ds2, 'd, 'e, 'f)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testUnion(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT c FROM t1 UNION (SELECT f FROM t2)"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+ val ds2 = CollectionDataSets.getSmall3TupleDataSet(env)
+ tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+ tEnv.registerDataSet("t2", ds2, 'd, 'e, 'f)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "Hi\n" + "Hello\n" + "Hello world\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testUnionWithFilter(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT c FROM (" +
+ "SELECT * FROM t1 UNION ALL (SELECT a, b, c FROM t2))" +
+ "WHERE b < 2"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env)
+ tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+ tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "Hi\n" + "Hallo\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testUnionWithAggregation(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT count(c) FROM (" +
+ "SELECT * FROM t1 UNION ALL (SELECT a, b, c FROM t2))"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env)
+ tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+ tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "18"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testExcept(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT c FROM t1 EXCEPT (SELECT c FROM t2)"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+ val ds2 = env.fromElements((1, 1L, "Hi"))
+ tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+ tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "Hello\n" + "Hello world\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ @Ignore
+ // calcite sql parser doesn't support EXCEPT ALL
+ def testExceptAll(): Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT c FROM t1 EXCEPT ALL SELECT c FROM t2"
+
+ val data1 = new mutable.MutableList[Int]
+ data1 += (1, 1, 1, 2, 2)
+ val data2 = new mutable.MutableList[Int]
+ data2 += (1, 2, 2, 3)
+ val ds1 = env.fromCollection(data1)
+ val ds2 = env.fromCollection(data2)
+
+ tEnv.registerDataSet("t1", ds1, 'c)
+ tEnv.registerDataSet("t2", ds2, 'c)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1\n1"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testExceptWithFilter(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT c FROM (" +
+ "SELECT * FROM t1 EXCEPT (SELECT a, b, c FROM t2))" +
+ "WHERE b < 2"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env)
+ tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+ tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "Hi\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testIntersect(): Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT c FROM t1 INTERSECT SELECT c FROM t2"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+ val data = new mutable.MutableList[(Int, Long, String)]
+ data.+=((1, 1L, "Hi"))
+ data.+=((2, 2L, "Hello"))
+ data.+=((2, 2L, "Hello"))
+ data.+=((3, 2L, "Hello world!"))
+ val ds2 = env.fromCollection(Random.shuffle(data))
+
+ tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+ tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "Hi\n" + "Hello\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ @Ignore
+ // calcite sql parser doesn't support INTERSECT ALL
+ def testIntersectAll(): Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT c FROM t1 INTERSECT ALL SELECT c FROM t2"
+
+ val data1 = new mutable.MutableList[Int]
+ data1 += (1, 1, 1, 2, 2)
+ val data2 = new mutable.MutableList[Int]
+ data2 += (1, 2, 2, 3)
+ val ds1 = env.fromCollection(data1)
+ val ds2 = env.fromCollection(data2)
+
+ tEnv.registerDataSet("t1", ds1, 'c)
+ tEnv.registerDataSet("t2", ds2, 'c)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1\n2\n2"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testIntersectWithFilter(): Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT c FROM ((SELECT * FROM t1) INTERSECT (SELECT * FROM t2)) WHERE a > 1"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+ val ds2 = CollectionDataSets.get3TupleDataSet(env)
+
+ tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+ tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "Hello\n" + "Hello world\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortITCase.scala
new file mode 100644
index 0000000..4672ec3
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortITCase.scala
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.sql
+
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.SortTestUtils._
+import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase
+import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.types.Row
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+@RunWith(classOf[Parameterized])
+class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode)
+ extends TableProgramsClusterTestBase(mode, configMode) {
+
+ private def getExecutionEnvironment = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ // set the parallelism explicitly to make sure the query is executed in
+ // a distributed manner
+ env.setParallelism(3)
+ env
+ }
+
+ @Test
+ def testOrderByMultipleFieldsWithSql(): Unit = {
+ val env = getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 DESC, _2 DESC"
+
+ implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
+ (- x.productElement(0).asInstanceOf[Int], - x.productElement(1).asInstanceOf[Long]))
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet("MyTable", ds)
+
+ val expected = sortExpectedly(tupleDataSetStrings)
+ // squash all rows inside a partition into one element
+ val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
+ // the rows need to be copied in object reuse mode
+ val copied = new mutable.ArrayBuffer[Row]
+ rows.foreach(r => copied += Row.copy(r))
+ Seq(copied)
+ }).collect()
+
+ def rowOrdering = Ordering.by((r : Row) => {
+ // ordering for this tuple will fall into the previous defined tupleOrdering,
+ // so we just need to return the field by their defining sequence
+ (r.getField(0).asInstanceOf[Int], r.getField(1).asInstanceOf[Long])
+ })
+
+ val result = results
+ .filterNot(_.isEmpty)
+ // sort all partitions by their head element to verify the order across partitions
+ .sortBy(_.head)(rowOrdering)
+ .reduceLeft(_ ++ _)
+
+ TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
+ }
+
+ @Test
+ def testOrderByWithOffset(): Unit = {
+ val env = getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 DESC OFFSET 2 ROWS"
+
+ implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
+ - x.productElement(0).asInstanceOf[Int] )
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet("MyTable", ds)
+
+ val expected = sortExpectedly(tupleDataSetStrings, 2, 21)
+ // squash all rows inside a partition into one element
+ val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
+ // the rows need to be copied in object reuse mode
+ val copied = new mutable.ArrayBuffer[Row]
+ rows.foreach(r => copied += Row.copy(r))
+ Seq(copied)
+ }).collect()
+
+ val result = results.
+ filterNot(_.isEmpty)
+ // sort all partitions by their head element to verify the order across partitions
+ .sortBy(_.head)(Ordering.by((r : Row) => -r.getField(0).asInstanceOf[Int]))
+ .reduceLeft(_ ++ _)
+
+ TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
+ }
+
+ @Test
+ def testOrderByWithOffsetAndFetch(): Unit = {
+ val env = getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 OFFSET 2 ROWS FETCH NEXT 5 ROWS ONLY"
+
+ implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
+ x.productElement(0).asInstanceOf[Int] )
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet("MyTable", ds)
+
+ val expected = sortExpectedly(tupleDataSetStrings, 2, 7)
+ // squash all rows inside a partition into one element
+ val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
+ // the rows need to be copied in object reuse mode
+ val copied = new mutable.ArrayBuffer[Row]
+ rows.foreach(r => copied += Row.copy(r))
+ Seq(copied)
+ }).collect()
+
+ val result = results
+ .filterNot(_.isEmpty)
+ // sort all partitions by their head element to verify the order across partitions
+ .sortBy(_.head)(Ordering.by((r : Row) => r.getField(0).asInstanceOf[Int]))
+ .reduceLeft(_ ++ _)
+
+ TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
+ }
+
+ @Test
+ def testOrderByLimit(): Unit = {
+ val env = getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable ORDER BY _2, _1 LIMIT 5"
+
+ implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
+ (x.productElement(1).asInstanceOf[Long], x.productElement(0).asInstanceOf[Int]) )
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet("MyTable", ds)
+
+ val expected = sortExpectedly(tupleDataSetStrings, 0, 5)
+ // squash all rows inside a partition into one element
+ val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
+ // the rows need to be copied in object reuse mode
+ val copied = new mutable.ArrayBuffer[Row]
+ rows.foreach(r => copied += Row.copy(r))
+ Seq(copied)
+ }).collect()
+
+ def rowOrdering = Ordering.by((r : Row) => {
+ // ordering for this tuple will fall into the previous defined tupleOrdering,
+ // so we just need to return the field by their defining sequence
+ (r.getField(0).asInstanceOf[Int], r.getField(1).asInstanceOf[Long])
+ })
+
+ val result = results
+ .filterNot(_.isEmpty)
+ // sort all partitions by their head element to verify the order across partitions
+ .sortBy(_.head)(rowOrdering)
+ .reduceLeft(_ ++ _)
+
+ TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
new file mode 100644
index 0000000..b7f1bb1
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.TableProgramsCollectionTestBase
+import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.types.Row
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class TableEnvironmentITCase(
+ configMode: TableConfigMode)
+ extends TableProgramsCollectionTestBase(configMode) {
+
+ @Test
+ def testSQLTable(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT * FROM MyTable WHERE a > 9"
+
+ val result = tEnv.sql(sqlQuery).select('a.avg, 'b.sum, 'c.count)
+
+ val expected = "15,65,12"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testTableSQLTable(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val t1 = ds.filter('a > 9)
+
+ tEnv.registerTable("MyTable", t1)
+
+ val sqlQuery = "SELECT avg(a) as a1, sum(b) as b1, count(c) as c1 FROM MyTable"
+
+ val result = tEnv.sql(sqlQuery).select('a1 + 1, 'b1 - 5, 'c1)
+
+ val expected = "16,60,12"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testMultipleSQLQueries(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ tEnv.registerTable("MyTable", t)
+
+ val sqlQuery = "SELECT a as aa FROM MyTable WHERE b = 6"
+ val result1 = tEnv.sql(sqlQuery)
+ tEnv.registerTable("ResTable", result1)
+
+ val sqlQuery2 = "SELECT count(aa) FROM ResTable"
+ val result2 = tEnv.sql(sqlQuery2)
+
+ val expected = "6"
+ val results = result2.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testSelectWithCompositeType(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT MyTable.a2, MyTable.a1._2 FROM MyTable"
+
+ val ds = env.fromElements(((12, true), "Hello")).toTable(tEnv).as('a1, 'a2)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "Hello,true\n"
+
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
new file mode 100644
index 0000000..504ab90
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.sql
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.{CommonTestData, TableProgramsCollectionTestBase}
+import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class TableSourceITCase(
+ configMode: TableConfigMode)
+ extends TableProgramsCollectionTestBase(configMode) {
+
+ @Test
+ def testCsvTableSource(): Unit = {
+
+ val csvTable = CommonTestData.getCsvTableSource
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ tEnv.registerTableSource("csvTable", csvTable)
+ val results = tEnv.sql(
+ "SELECT id, `first`, `last`, score FROM csvTable").collect()
+
+ val expected = Seq(
+ "1,Mike,Smith,12.3",
+ "2,Bob,Taylor,45.6",
+ "3,Sam,Miller,7.89",
+ "4,Peter,Smith,0.12",
+ "5,Liz,Williams,34.5",
+ "6,Sally,Miller,6.78",
+ "7,Alice,Smith,90.1",
+ "8,Kelly,Williams,2.34").mkString("\n")
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testNested(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tableEnv = TableEnvironment.getTableEnvironment(env, config)
+ val nestedTable = CommonTestData.getNestedTableSource
+
+ tableEnv.registerTableSource("NestedPersons", nestedTable)
+
+ val result = tableEnv.sql("SELECT NestedPersons.firstName, NestedPersons.lastName," +
+ "NestedPersons.address.street, NestedPersons.address.city AS city " +
+ "FROM NestedPersons " +
+ "WHERE NestedPersons.address.city LIKE 'Dublin'").collect()
+
+ val expected = "Bob,Taylor,Pearse Street,Dublin"
+
+ TestBaseUtils.compareResultAsText(result.asJava, expected)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/AggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/AggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/AggregateITCase.scala
new file mode 100644
index 0000000..457142c
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/AggregateITCase.scala
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.table
+
+import java.math.BigDecimal
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMergeAndReset
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.functions.aggfunctions.CountAggFunction
+import org.apache.flink.table.runtime.utils.TableProgramsCollectionTestBase
+import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.types.Row
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class AggregationsITCase(
+ configMode: TableConfigMode)
+ extends TableProgramsCollectionTestBase(configMode) {
+
+ @Test
+ def testAggregationTypes(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+ .select('_1.sum, '_1.sum0, '_1.min, '_1.max, '_1.count, '_1.avg)
+
+ val results = t.toDataSet[Row].collect()
+ val expected = "231,231,1,21,21,11"
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testWorkingAggregationDataTypes(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = env.fromElements(
+ (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
+ (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv)
+ .select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count)
+
+ val expected = "1,1,1,1,1.5,1.5,2"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testProjection(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = env.fromElements(
+ (1: Byte, 1: Short),
+ (2: Byte, 2: Short)).toTable(tEnv)
+ .select('_1.avg, '_1.sum, '_1.count, '_2.avg, '_2.sum)
+
+ val expected = "1,3,2,1,3"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testAggregationWithArithmetic(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
+ .select(('_1 + 2).avg + 2, '_2.count + 5)
+
+ val expected = "5.5,7"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testAggregationWithTwoCount(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
+ .select('_1.count, '_2.count)
+
+ val expected = "2,2"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testAggregationAfterProjection(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = env.fromElements(
+ (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
+ (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv)
+ .select('_1, '_2, '_3)
+ .select('_1.avg, '_2.sum, '_3.count)
+
+ val expected = "1,3,2"
+ val result = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(result.asJava, expected)
+ }
+
+ @Test
+ def testSQLStyleAggregations(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ .select(
+ """Sum( a) as a1, a.sum as a2,
+ |Min (a) as b1, a.min as b2,
+ |Max (a ) as c1, a.max as c2,
+ |Avg ( a ) as d1, a.avg as d2,
+ |Count(a) as e1, a.count as e2
+ """.stripMargin)
+
+ val expected = "231,231,1,1,21,21,11,11,21,21"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testPojoAggregation(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val input = env.fromElements(
+ WC("hello", 1),
+ WC("hello", 1),
+ WC("ciao", 1),
+ WC("hola", 1),
+ WC("hola", 1))
+ val expr = input.toTable(tEnv)
+ val result = expr
+ .groupBy('word)
+ .select('word, 'frequency.sum as 'frequency)
+ .filter('frequency === 2)
+ .toDataSet[WC]
+
+ val mappedResult = result.map(w => (w.word, w.frequency * 10)).collect()
+ val expected = "(hello,20)\n" + "(hola,20)"
+ TestBaseUtils.compareResultAsText(mappedResult.asJava, expected)
+ }
+
+ @Test
+ def testDistinct(): Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val distinct = ds.select('b).distinct()
+
+ val expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
+ val results = distinct.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testDistinctAfterAggregate(): Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+ val distinct = ds.groupBy('a, 'e).select('e).distinct()
+
+ val expected = "1\n" + "2\n" + "3\n"
+ val results = distinct.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testGroupedAggregate(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ val countFun = new CountAggFunction
+ val wAvgFun = new WeightedAvgWithMergeAndReset
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ .groupBy('b)
+ .select('b, 'a.sum, countFun('c), wAvgFun('b, 'a), wAvgFun('a, 'a))
+
+ val expected = "1,1,1,1,1\n" + "2,5,2,2,2\n" + "3,15,3,3,5\n" + "4,34,4,4,8\n" +
+ "5,65,5,5,13\n" + "6,111,6,6,18\n"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testGroupingKeyForwardIfNotUsed(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ .groupBy('b)
+ .select('a.sum)
+
+ val expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testGroupNoAggregation(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env)
+ .toTable(tEnv, 'a, 'b, 'c)
+ .groupBy('b)
+ .select('a.sum as 'd, 'b)
+ .groupBy('b, 'd)
+ .select('b)
+
+ val expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testGroupedAggregateWithLongKeys(): Unit = {
+ // This uses very long keys to force serialized comparison.
+ // With short keys, the normalized key is sufficient.
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = env.fromElements(
+ ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
+ ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
+ ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
+ ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
+ ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
+ ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2),
+ ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2),
+ ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2),
+ ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2))
+ .rebalance().setParallelism(2).toTable(tEnv, 'a, 'b, 'c)
+ .groupBy('a, 'b)
+ .select('c.sum)
+
+ val expected = "10\n" + "8\n"
+ val results = ds.collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testGroupedAggregateWithConstant1(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ .select('a, 4 as 'four, 'b)
+ .groupBy('four, 'a)
+ .select('four, 'b.sum)
+
+ val expected = "4,2\n" + "4,3\n" + "4,5\n" + "4,5\n" + "4,5\n" + "4,6\n" +
+ "4,6\n" + "4,6\n" + "4,3\n" + "4,4\n" + "4,6\n" + "4,1\n" + "4,4\n" +
+ "4,4\n" + "4,5\n" + "4,6\n" + "4,2\n" + "4,3\n" + "4,4\n" + "4,5\n" + "4,6\n"
+ val results = t.toDataSet[Row].collect()
+
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testGroupedAggregateWithConstant2(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ .select('b, 4 as 'four, 'a)
+ .groupBy('b, 'four)
+ .select('four, 'a.sum)
+
+ val expected = "4,1\n" + "4,5\n" + "4,15\n" + "4,34\n" + "4,65\n" + "4,111\n"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testGroupedAggregateWithExpression(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+ .groupBy('e, 'b % 3)
+ .select('c.min, 'e, 'a.avg, 'd.count)
+
+ val expected = "0,1,1,1\n" + "3,2,3,3\n" + "7,1,4,2\n" + "14,2,5,1\n" +
+ "5,3,4,2\n" + "2,1,3,2\n" + "1,2,3,3\n" + "12,3,5,1"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testGroupedAggregateWithFilter(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ .groupBy('b)
+ .select('b, 'a.sum)
+ .where('b === 2)
+
+ val expected = "2,5\n"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testAnalyticAggregation(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ val ds = env.fromElements(
+ (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, BigDecimal.ONE),
+ (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, new BigDecimal(2))).toTable(tEnv)
+ val res = ds.select(
+ '_1.stddevPop, '_2.stddevPop, '_3.stddevPop, '_4.stddevPop, '_5.stddevPop,
+ '_6.stddevPop, '_7.stddevPop,
+ '_1.stddevSamp, '_2.stddevSamp, '_3.stddevSamp, '_4.stddevSamp, '_5.stddevSamp,
+ '_6.stddevSamp, '_7.stddevSamp,
+ '_1.varPop, '_2.varPop, '_3.varPop, '_4.varPop, '_5.varPop,
+ '_6.varPop, '_7.varPop,
+ '_1.varSamp, '_2.varSamp, '_3.varSamp, '_4.varSamp, '_5.varSamp,
+ '_6.varSamp, '_7.varSamp)
+ val expected =
+ "0,0,0," +
+ "0,0.5,0.5,0.5," +
+ "1,1,1," +
+ "1,0.70710677,0.7071067811865476,0.7071067811865476," +
+ "0,0,0," +
+ "0,0.25,0.25,0.25," +
+ "1,1,1," +
+ "1,0.5,0.5,0.5"
+ val results = res.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+}
+
+case class WC(word: String, frequency: Long)