You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:35 UTC
[06/47] flink git commit: [FLINK-4704] [table] Refactor package
structure of flink-table.
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
new file mode 100644
index 0000000..0b2c8fc
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch
+
+import java.io.{File, FileOutputStream, OutputStreamWriter}
+
+import org.apache.flink.api.common.io.GenericInputFormat
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.{DataSet => JavaSet, ExecutionEnvironment => JavaExecEnv}
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.sources.{BatchTableSource, CsvTableSource}
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class TableSourceITCase(
+ mode: TestExecutionMode,
+ configMode: TableConfigMode)
+ extends TableProgramsTestBase(mode, configMode) {
+
+ @Test
+ def testBatchTableSourceTableAPI(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ tEnv.registerTableSource("MyTestTable", new TestBatchTableSource())
+ val results = tEnv
+ .scan("MyTestTable")
+ .where('amount < 4)
+ .select('amount * 'id, 'name)
+ .collect()
+
+ val expected = Seq(
+ "0,Record_0", "0,Record_16", "0,Record_32", "1,Record_1", "17,Record_17",
+ "36,Record_18", "4,Record_2", "57,Record_19", "9,Record_3").mkString("\n")
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testBatchTableSourceSQL(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ tEnv.registerTableSource("MyTestTable", new TestBatchTableSource())
+ val results = tEnv.sql(
+ "SELECT amount * id, name FROM MyTestTable WHERE amount < 4").collect()
+
+ val expected = Seq(
+ "0,Record_0", "0,Record_16", "0,Record_32", "1,Record_1", "17,Record_17",
+ "36,Record_18", "4,Record_2", "57,Record_19", "9,Record_3").mkString("\n")
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testCsvTableSource(): Unit = {
+
+ val csvRecords = Seq(
+ "First#Id#Score#Last",
+ "Mike#1#12.3#Smith",
+ "Bob#2#45.6#Taylor",
+ "Sam#3#7.89#Miller",
+ "Peter#4#0.12#Smith",
+ "% Just a comment",
+ "Liz#5#34.5#Williams",
+ "Sally#6#6.78#Miller",
+ "Alice#7#90.1#Smith",
+ "Kelly#8#2.34#Williams"
+ )
+
+ val tempFile = File.createTempFile("csv-test", "tmp")
+ tempFile.deleteOnExit()
+ val tmpWriter = new OutputStreamWriter(new FileOutputStream(tempFile), "UTF-8")
+ tmpWriter.write(csvRecords.mkString("$"))
+ tmpWriter.close()
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val csvTable = new CsvTableSource(
+ tempFile.getAbsolutePath,
+ Array("first", "id", "score", "last"),
+ Array(
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.DOUBLE_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO
+ ),
+ fieldDelim = "#",
+ rowDelim = "$",
+ ignoreFirstLine = true,
+ ignoreComments = "%"
+ )
+
+ tEnv.registerTableSource("csvTable", csvTable)
+ val results = tEnv.sql(
+ "SELECT last, sum(score), max(id) FROM csvTable GROUP BY last").collect()
+
+ val expected = Seq(
+ "Smith,102.52,7",
+ "Taylor,45.6,2",
+ "Miller,14.67,6",
+ "Williams,36.84,8").mkString("\n")
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+}
+
+class TestBatchTableSource extends BatchTableSource[Row] {
+
+ val fieldTypes: Array[TypeInformation[_]] = Array(
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO
+ )
+
+ /** Returns the data of the table as a [[org.apache.flink.api.java.DataSet]]. */
+ override def getDataSet(execEnv: JavaExecEnv): JavaSet[Row] = {
+ execEnv.createInput(new GeneratingInputFormat(33), getReturnType).setParallelism(1)
+ }
+
+ /** Returns the types of the table fields. */
+ override def getFieldTypes: Array[TypeInformation[_]] = fieldTypes
+
+ /** Returns the names of the table fields. */
+ override def getFieldsNames: Array[String] = Array("name", "id", "amount")
+
+ /** Returns the [[TypeInformation]] for the return type. */
+ override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes: _*)
+
+ /** Returns the number of fields of the table. */
+ override def getNumberOfFields: Int = 3
+}
+
+class GeneratingInputFormat(val num: Int) extends GenericInputFormat[Row] {
+
+ var cnt = 0L
+
+ override def reachedEnd(): Boolean = cnt >= num
+
+ override def nextRecord(reuse: Row): Row = {
+ reuse.setField(0, s"Record_$cnt")
+ reuse.setField(1, cnt)
+ reuse.setField(2, (cnt % 16).toInt)
+ cnt += 1
+ reuse
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala
new file mode 100644
index 0000000..4f55bee
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import TableProgramsTestBase.TableConfigMode
+import org.apache.flink.types.Row
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class AggregationsITCase(
+ mode: TestExecutionMode,
+ configMode: TableConfigMode)
+ extends TableProgramsTestBase(mode, configMode) {
+
+ @Test
+ def testAggregationTypes(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT sum(_1), min(_1), max(_1), count(_1), avg(_1) FROM MyTable"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "231,1,21,21,11"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testTableAggregation(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT sum(_1) FROM MyTable"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "231"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testDataSetAggregation(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT sum(_1) FROM MyTable"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "231"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testWorkingAggregationDataTypes(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery =
+ "SELECT avg(_1), avg(_2), avg(_3), avg(_4), avg(_5), avg(_6), count(_7), " +
+ " sum(CAST(_6 AS DECIMAL))" +
+ "FROM MyTable"
+
+ val ds = env.fromElements(
+ (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
+ (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao"))
+ tEnv.registerDataSet("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,1,1,1,1.5,1.5,2,3.0"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testTableWorkingAggregationDataTypes(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT avg(a), avg(b), avg(c), avg(d), avg(e), avg(f), count(g)" +
+ "FROM MyTable"
+
+ val ds = env.fromElements(
+ (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
+ (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'f, 'g)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,1,1,1,1.5,1.5,2"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testTableProjection(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT avg(a), sum(a), count(a), avg(b), sum(b) " +
+ "FROM MyTable"
+
+ val ds = env.fromElements((1: Byte, 1: Short), (2: Byte, 2: Short)).toTable(tEnv, 'a, 'b)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,3,2,1,3"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testTableAggregationWithArithmetic(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT avg(a + 2) + 2, count(b) + 5 " +
+ "FROM MyTable"
+
+ val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv, 'a, 'b)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "5.5,7"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testAggregationWithTwoCount(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT count(_1), count(_2) FROM MyTable"
+
+ val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "2,2"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+
+ @Test
+ def testAggregationAfterProjection(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM " +
+ "(SELECT _1 as a, _2 as b, _3 as c FROM MyTable)"
+
+ val ds = env.fromElements(
+ (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
+ (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,3,2"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testDistinctAggregate(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT sum(_1) as a, count(distinct _3) as b FROM MyTable"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet("MyTable", ds)
+
+ // must fail. distinct aggregates are not supported
+ tEnv.sql(sqlQuery).toDataSet[Row]
+ }
+
+ @Test(expected = classOf[TableException])
+ def testGroupedDistinctAggregate(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT _2, avg(distinct _1) as a, count(_3) as b FROM MyTable GROUP BY _2"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet("MyTable", ds)
+
+ // must fail. distinct aggregates are not supported
+ tEnv.sql(sqlQuery).toDataSet[Row]
+ }
+
+ @Test(expected = classOf[TableException])
+ def testGroupingSetAggregate(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT _2, _3, avg(_1) as a FROM MyTable GROUP BY GROUPING SETS (_2, _3)"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet("MyTable", ds)
+
+ // must fail. grouping sets are not supported
+ tEnv.sql(sqlQuery).toDataSet[Row]
+ }
+
+ @Test
+ def testAggregateEmptyDataSets(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT avg(a), sum(a), count(b) " +
+ "FROM MyTable where a = 4 group by a"
+
+ val sqlQuery2 = "SELECT avg(a), sum(a), count(b) " +
+ "FROM MyTable where a = 4"
+
+ val sqlQuery3 = "SELECT avg(a), sum(a), count(b) " +
+ "FROM MyTable"
+
+ val ds = env.fromElements(
+ (1: Byte, 1: Short),
+ (2: Byte, 2: Short))
+ .toTable(tEnv, 'a, 'b)
+
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+ val result2 = tEnv.sql(sqlQuery2)
+ val result3 = tEnv.sql(sqlQuery3)
+
+ val results = result.toDataSet[Row].collect()
+ val expected = Seq.empty
+ val results2 = result2.toDataSet[Row].collect()
+ val expected2 = "null,null,0"
+ val results3 = result3.toDataSet[Row].collect()
+ val expected3 = "1,3,2"
+
+ assert(results.equals(expected),
+ "Empty result is expected for grouped set, but actual: " + results)
+ TestBaseUtils.compareResultAsText(results2.asJava, expected2)
+ TestBaseUtils.compareResultAsText(results3.asJava, expected3)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CalcITCase.scala
new file mode 100644
index 0000000..f3f554b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CalcITCase.scala
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.sql
+
+
+import java.sql.{Date, Time, Timestamp}
+import java.util
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala.batch.sql.FilterITCase.MyHashCode
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.apache.flink.table.functions.ScalarFunction
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.types.Row
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class CalcITCase(
+ mode: TestExecutionMode,
+ configMode: TableConfigMode)
+ extends TableProgramsTestBase(mode, configMode) {
+
+ @Test
+ def testSelectStarFromTable(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+ "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+ "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testSelectStarFromDataSet(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+ "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+ "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testSimpleSelectAll(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT a, b, c FROM MyTable"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+ "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+ "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testSelectWithNaming(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT _1 as a, _2 as b FROM MyTable"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+ "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+ "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
+
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidFields(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT a, foo FROM MyTable"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", ds)
+
+ tEnv.sql(sqlQuery)
+ }
+
+ @Test
+ def testAllRejectingFilter(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable WHERE false"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testAllPassingFilter(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable WHERE true"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+ "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+ "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+ "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+ "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+ "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testFilterOnString(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable WHERE c LIKE '%world%'"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testFilterOnInteger(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)=0"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" +
+ "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" +
+ "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
+ "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testDisjunctivePredicate(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable WHERE a < 2 OR a > 20"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1,1,Hi\n" + "21,6,Comment#15\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testFilterWithAnd(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)<>0 AND MOD(b,2)=0"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" +
+ "9,4,Comment#3\n" + "17,6,Comment#11\n" +
+ "19,6,Comment#13\n" + "21,6,Comment#15\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testAdvancedDataTypes(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT a, b, c, DATE '1984-07-12', TIME '14:34:24', " +
+ "TIMESTAMP '1984-07-12 14:34:24' FROM MyTable"
+
+ val ds = env.fromElements((
+ Date.valueOf("1984-07-12"),
+ Time.valueOf("14:34:24"),
+ Timestamp.valueOf("1984-07-12 14:34:24")))
+ tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1984-07-12,14:34:24,1984-07-12 14:34:24.0," +
+ "1984-07-12,14:34:24,1984-07-12 14:34:24.0"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testUserDefinedScalarFunction(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ tEnv.registerFunction("hashCode",
+ new org.apache.flink.table.api.java.batch.table.CalcITCase.OldHashCode)
+ tEnv.registerFunction("hashCode", MyHashCode)
+
+ val ds = env.fromElements("a", "b", "c")
+ tEnv.registerDataSet("MyTable", ds, 'text)
+
+ val result = tEnv.sql("SELECT hashCode(text) FROM MyTable")
+
+ val expected = "97\n98\n99"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+}
+
+object FilterITCase {
+ object MyHashCode extends ScalarFunction {
+ def eval(s: String): Int = s.hashCode()
+ }
+}
+
+object CalcITCase {
+
+ @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
+ def parameters(): util.Collection[Array[java.lang.Object]] = {
+ Seq[Array[AnyRef]](
+ Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.DEFAULT),
+ Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.NO_NULL)).asJava
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
new file mode 100644
index 0000000..344428b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.types.Row
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class JoinITCase(
+ mode: TestExecutionMode,
+ configMode: TableConfigMode)
+ extends TableProgramsTestBase(mode, configMode) {
+
+ @Test
+ def testJoin(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testJoinWithFilter(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND b < 2"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "Hi,Hallo\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testJoinWithJoinFilter(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 AND h < b"
+
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "Hello world, how are you?,Hallo Welt wie\n" +
+ "I am fine.,Hallo Welt wie\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testJoinWithMultipleKeys(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = d AND b = h"
+
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
+ "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testJoinNonExistingKey(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE foo = e"
+
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ tEnv.sql(sqlQuery)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testJoinNonMatchingKeyTypes(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = g"
+
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ tEnv.sql(sqlQuery).toDataSet[Row].collect()
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testJoinWithAmbiguousFields(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = d"
+
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'c)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ tEnv.sql(sqlQuery)
+ }
+
+ @Test
+ def testJoinWithAlias(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT Table5.c, Table3.c FROM Table3, Table5 WHERE a = d AND a < 4"
+
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'c)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ val result = tEnv.sql(sqlQuery)
+ val expected = "1,Hi\n" + "2,Hello\n" + "1,Hello\n" +
+ "2,Hello world\n" + "2,Hello world\n" + "3,Hello world\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testJoinNoEqualityPredicate(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE d = f"
+
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ tEnv.sql(sqlQuery).toDataSet[Row].collect()
+ }
+
+ @Test
+ def testDataSetJoinWithAggregation(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT COUNT(g), COUNT(b) FROM Table3, Table5 WHERE a = d"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env)
+ tEnv.registerDataSet("Table3", ds1, 'a, 'b, 'c)
+ tEnv.registerDataSet("Table5", ds2, 'd, 'e, 'f, 'g, 'h)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "6,6"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testTableJoinWithAggregation(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT COUNT(b), COUNT(g) FROM Table3, Table5 WHERE a = d"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "6,6"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testFullOuterJoin(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ tEnv.getConfig.setNullCheck(true)
+
+ val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n" +
+ "null,Hallo Welt wie\n" + "null,Hallo Welt wie gehts?\n" + "null,ABC\n" + "null,BCD\n" +
+ "null,CDE\n" + "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "null,HIJ\n" +
+ "null,IJK\n" + "null,JKL\n" + "null,KLM"
+
+ val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testLeftOuterJoin(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ tEnv.getConfig.setNullCheck(true)
+
+ val sqlQuery = "SELECT c, g FROM Table5 LEFT OUTER JOIN Table3 ON b = e"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ tEnv.sql(sqlQuery).toDataSet[Row].collect()
+
+ val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n" +
+ "null,Hallo Welt wie\n" + "null,Hallo Welt wie gehts?\n" + "null,ABC\n" + "null,BCD\n" +
+ "null,CDE\n" + "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "null,HIJ\n" +
+ "null,IJK\n" + "null,JKL\n" + "null,KLM"
+ val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testRightOuterJoin(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ tEnv.getConfig.setNullCheck(true)
+
+ val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ tEnv.sql(sqlQuery).toDataSet[Row].collect()
+
+ val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n" +
+ "null,Hallo Welt wie\n" + "null,Hallo Welt wie gehts?\n" + "null,ABC\n" + "null,BCD\n" +
+ "null,CDE\n" + "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "null,HIJ\n" +
+ "null,IJK\n" + "null,JKL\n" + "null,KLM"
+ val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testCrossJoin(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val table1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a1, 'a2, 'a3)
+ val table2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('b1, 'b2, 'b3)
+ tEnv.registerTable("A", table1)
+ tEnv.registerTable("B", table2)
+
+ val sqlQuery = "SELECT a1, b1 FROM A CROSS JOIN B"
+ tEnv.sql(sqlQuery).count
+ }
+
+ @Test
+ def testCrossJoinWithLeftSingleRowInput(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val table = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a1, 'a2, 'a3)
+ tEnv.registerTable("A", table)
+
+ val sqlQuery2 = "SELECT * FROM (SELECT count(*) FROM A) CROSS JOIN A"
+ val expected =
+ "3,1,1,Hi\n" +
+ "3,2,2,Hello\n" +
+ "3,3,2,Hello world"
+ val result = tEnv.sql(sqlQuery2).collect()
+ TestBaseUtils.compareResultAsText(result.asJava, expected)
+ }
+
+ @Test
+ def testCrossJoinWithRightSingleRowInput(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val table = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a1, 'a2, 'a3)
+ tEnv.registerTable("A", table)
+
+ val sqlQuery1 = "SELECT * FROM A CROSS JOIN (SELECT count(*) FROM A)"
+ val expected =
+ "1,1,Hi,3\n" +
+ "2,2,Hello,3\n" +
+ "3,2,Hello world,3"
+ val result = tEnv.sql(sqlQuery1).collect()
+ TestBaseUtils.compareResultAsText(result.asJava, expected)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsITCase.scala
new file mode 100644
index 0000000..cc44e7a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsITCase.scala
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.types.Row
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.Random
+
+@RunWith(classOf[Parameterized])
+class SetOperatorsITCase(
+ mode: TestExecutionMode,
+ configMode: TableConfigMode)
+ extends TableProgramsTestBase(mode, configMode) {
+
+ @Test
+ def testUnionAll(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT c FROM t1 UNION ALL (SELECT f FROM t2)"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+ val ds2 = CollectionDataSets.getSmall3TupleDataSet(env)
+ tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+ tEnv.registerDataSet("t2", ds2, 'd, 'e, 'f)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testUnion(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT c FROM t1 UNION (SELECT f FROM t2)"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+ val ds2 = CollectionDataSets.getSmall3TupleDataSet(env)
+ tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+ tEnv.registerDataSet("t2", ds2, 'd, 'e, 'f)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "Hi\n" + "Hello\n" + "Hello world\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testUnionWithFilter(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT c FROM (" +
+ "SELECT * FROM t1 UNION ALL (SELECT a, b, c FROM t2))" +
+ "WHERE b < 2"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env)
+ tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+ tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "Hi\n" + "Hallo\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testUnionWithAggregation(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT count(c) FROM (" +
+ "SELECT * FROM t1 UNION ALL (SELECT a, b, c FROM t2))"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env)
+ tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+ tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "18"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testExcept(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT c FROM t1 EXCEPT (SELECT c FROM t2)"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+ val ds2 = env.fromElements((1, 1L, "Hi"))
+ tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+ tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "Hello\n" + "Hello world\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ @Ignore
+ // calcite sql parser doesn't support EXCEPT ALL
+ def testExceptAll(): Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT c FROM t1 EXCEPT ALL SELECT c FROM t2"
+
+ val data1 = new mutable.MutableList[Int]
+ data1 += (1, 1, 1, 2, 2)
+ val data2 = new mutable.MutableList[Int]
+ data2 += (1, 2, 2, 3)
+ val ds1 = env.fromCollection(data1)
+ val ds2 = env.fromCollection(data2)
+
+ tEnv.registerDataSet("t1", ds1, 'c)
+ tEnv.registerDataSet("t2", ds2, 'c)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1\n1"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testExceptWithFilter(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT c FROM (" +
+ "SELECT * FROM t1 EXCEPT (SELECT a, b, c FROM t2))" +
+ "WHERE b < 2"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env)
+ tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+ tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "Hi\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testIntersect(): Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT c FROM t1 INTERSECT SELECT c FROM t2"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+ val data = new mutable.MutableList[(Int, Long, String)]
+ data.+=((1, 1L, "Hi"))
+ data.+=((2, 2L, "Hello"))
+ data.+=((2, 2L, "Hello"))
+ data.+=((3, 2L, "Hello world!"))
+ val ds2 = env.fromCollection(Random.shuffle(data))
+
+ tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+ tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "Hi\n" + "Hello\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ @Ignore
+ // calcite sql parser doesn't support INTERSECT ALL
+ def testIntersectAll(): Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT c FROM t1 INTERSECT ALL SELECT c FROM t2"
+
+ val data1 = new mutable.MutableList[Int]
+ data1 += (1, 1, 1, 2, 2)
+ val data2 = new mutable.MutableList[Int]
+ data2 += (1, 2, 2, 3)
+ val ds1 = env.fromCollection(data1)
+ val ds2 = env.fromCollection(data2)
+
+ tEnv.registerDataSet("t1", ds1, 'c)
+ tEnv.registerDataSet("t2", ds2, 'c)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "1\n2\n2"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testIntersectWithFilter(): Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT c FROM ((SELECT * FROM t1) INTERSECT (SELECT * FROM t2)) WHERE a > 1"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+ val ds2 = CollectionDataSets.get3TupleDataSet(env)
+
+ tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+ tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "Hello\n" + "Hello world\n"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala
new file mode 100644
index 0000000..6c07c6e
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class SetOperatorsTest extends TableTestBase {
+
+ @Test
+ def testExists(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Long, Int, String)]("A", 'a_long, 'a_int, 'a_string)
+ util.addTable[(Long, Int, String)]("B", 'b_long, 'b_int, 'b_string)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ binaryNode(
+ "DataSetJoin",
+ batchTableNode(0),
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetCalc",
+ binaryNode(
+ "DataSetJoin",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(1),
+ term("select", "b_long")
+ ),
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a_long")
+ ),
+ term("groupBy", "a_long"),
+ term("select", "a_long")
+ ),
+ term("where", "=(a_long, b_long)"),
+ term("join", "b_long", "a_long"),
+ term("joinType", "InnerJoin")
+ ),
+ term("select", "true AS $f0", "a_long")
+ ),
+ term("groupBy", "a_long"),
+ term("select", "a_long", "MIN($f0) AS $f1")
+ ),
+ term("where", "=(a_long, a_long0)"),
+ term("join", "a_long", "a_int", "a_string", "a_long0", "$f1"),
+ term("joinType", "InnerJoin")
+ ),
+ term("select", "a_int", "a_string")
+ )
+
+ util.verifySql(
+ "SELECT a_int, a_string FROM A WHERE EXISTS(SELECT * FROM B WHERE a_long = b_long)",
+ expected
+ )
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SingleRowJoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SingleRowJoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SingleRowJoinTest.scala
new file mode 100644
index 0000000..ecc685d
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SingleRowJoinTest.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class SingleRowJoinTest extends TableTestBase {
+
+ @Test
+ def testSingleRowEquiJoin(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, String)]("A", 'a1, 'a2)
+
+ val query =
+ "SELECT a1, a2 " +
+ "FROM A, (SELECT count(a1) AS cnt FROM A) " +
+ "WHERE a1 = cnt"
+
+ val expected =
+ unaryNode(
+ "DataSetCalc",
+ binaryNode(
+ "DataSetSingleRowJoin",
+ batchTableNode(0),
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a1")
+ ),
+ tuples(List(null)),
+ term("values", "a1")
+ ),
+ term("union","a1")
+ ),
+ term("select", "COUNT(a1) AS cnt")
+ ),
+ term("where", "=(CAST(a1), cnt)"),
+ term("join", "a1", "a2", "cnt"),
+ term("joinType", "NestedLoopJoin")
+ ),
+ term("select", "a1", "a2")
+ )
+
+ util.verifySql(query, expected)
+ }
+
+ @Test
+ def testSingleRowNotEquiJoin(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, String)]("A", 'a1, 'a2)
+
+ val query =
+ "SELECT a1, a2 " +
+ "FROM A, (SELECT count(a1) AS cnt FROM A) " +
+ "WHERE a1 < cnt"
+
+ val expected =
+ unaryNode(
+ "DataSetCalc",
+ binaryNode(
+ "DataSetSingleRowJoin",
+ batchTableNode(0),
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a1")
+ ),
+ tuples(List(null)),
+ term("values", "a1")
+ ),
+ term("union", "a1")
+ ),
+ term("select", "COUNT(a1) AS cnt")
+ ),
+ term("where", "<(a1, cnt)"),
+ term("join", "a1", "a2", "cnt"),
+ term("joinType", "NestedLoopJoin")
+ ),
+ term("select", "a1", "a2")
+ )
+
+ util.verifySql(query, expected)
+ }
+
+ @Test
+ def testSingleRowJoinWithComplexPredicate(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long)]("A", 'a1, 'a2)
+ util.addTable[(Int, Long)]("B", 'b1, 'b2)
+
+ val query =
+ "SELECT a1, a2, b1, b2 " +
+ "FROM A, (SELECT min(b1) AS b1, max(b2) AS b2 FROM B) " +
+ "WHERE a1 < b1 AND a2 = b2"
+
+ val expected = binaryNode(
+ "DataSetSingleRowJoin",
+ batchTableNode(0),
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ batchTableNode(1),
+ tuples(List(null, null)),
+ term("values", "b1", "b2")
+ ),
+ term("union","b1","b2")
+ ),
+ term("select", "MIN(b1) AS b1", "MAX(b2) AS b2")
+ ),
+ term("where", "AND(<(a1, b1)", "=(a2, b2))"),
+ term("join", "a1", "a2", "b1", "b2"),
+ term("joinType", "NestedLoopJoin")
+ )
+
+ util.verifySql(query, expected)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala
new file mode 100644
index 0000000..0f46e9b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.sql
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.table.api.scala.batch.utils.SortTestUtils._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala._
+import org.apache.flink.types.Row
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class SortITCase(
+ mode: TestExecutionMode,
+ configMode: TableConfigMode)
+ extends TableProgramsTestBase(mode, configMode) {
+
+ @Test
+ def testOrderByMultipleFieldsWithSql(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 DESC, _2 DESC"
+
+ implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+ (- x.productElement(0).asInstanceOf[Int], - x.productElement(1).asInstanceOf[Long]))
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet("MyTable", ds)
+
+ val expected = sortExpectedly(tupleDataSetStrings)
+ val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+
+ val result = results
+ .filterNot(_.isEmpty)
+ .sortBy(_.head)(Ordering.by(f=> f.toString))
+ .reduceLeft(_ ++ _)
+
+ TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
+ }
+
+ @Test
+ def testOrderByWithOffset(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 DESC OFFSET 2 ROWS"
+
+ implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+ - x.productElement(0).asInstanceOf[Int] )
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet("MyTable", ds)
+
+ val expected = sortExpectedly(tupleDataSetStrings, 2, 21)
+ val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+
+ val result = results.
+ filterNot(_.isEmpty)
+ .sortBy(_.head)(Ordering.by(f=> f.toString))
+ .reduceLeft(_ ++ _)
+
+ TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
+ }
+
+ @Test
+ def testOrderByWithOffsetAndFetch(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 OFFSET 2 ROWS FETCH NEXT 5 ROWS ONLY"
+
+ implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+ x.productElement(0).asInstanceOf[Int] )
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet("MyTable", ds)
+
+ val expected = sortExpectedly(tupleDataSetStrings, 2, 7)
+ val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+
+ val result = results
+ .filterNot(_.isEmpty)
+ .sortBy(_.head)(Ordering.by(f=> f.toString))
+ .reduceLeft(_ ++ _)
+
+ TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
+ }
+
+ @Test
+ def testOrderByLimit(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable ORDER BY _2, _1 LIMIT 5"
+
+ implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+ (x.productElement(1).asInstanceOf[Long], x.productElement(0).asInstanceOf[Int]) )
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet("MyTable", ds)
+
+ val expected = sortExpectedly(tupleDataSetStrings, 0, 5)
+ val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+
+ val result = results
+ .filterNot(_.isEmpty)
+ .sortBy(_.head)(Ordering.by(f=> f.toString))
+ .reduceLeft(_ ++ _)
+
+ TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testLimitWithoutOrder(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT * FROM MyTable LIMIT 5"
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet("MyTable", ds)
+
+ tEnv.sql(sqlQuery).toDataSet[Row].collect()
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/TableWithSQLITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/TableWithSQLITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/TableWithSQLITCase.scala
new file mode 100644
index 0000000..aabc62a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/TableWithSQLITCase.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.types.Row
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class TableWithSQLITCase(
+ mode: TestExecutionMode,
+ configMode: TableConfigMode)
+ extends TableProgramsTestBase(mode, configMode) {
+
+ @Test
+ def testSQLTable(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT * FROM MyTable WHERE a > 9"
+
+ val result = tEnv.sql(sqlQuery).select('a.avg, 'b.sum, 'c.count)
+
+ val expected = "15,65,12"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testTableSQLTable(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val t1 = ds.filter('a > 9)
+
+ tEnv.registerTable("MyTable", t1)
+
+ val sqlQuery = "SELECT avg(a) as a1, sum(b) as b1, count(c) as c1 FROM MyTable"
+
+ val result = tEnv.sql(sqlQuery).select('a1 + 1, 'b1 - 5, 'c1)
+
+ val expected = "16,60,12"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testMultipleSQLQueries(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ tEnv.registerTable("MyTable", t)
+
+ val sqlQuery = "SELECT a as aa FROM MyTable WHERE b = 6"
+ val result1 = tEnv.sql(sqlQuery)
+ tEnv.registerTable("ResTable", result1)
+
+ val sqlQuery2 = "SELECT count(aa) FROM ResTable"
+ val result2 = tEnv.sql(sqlQuery2)
+
+ val expected = "6"
+ val results = result2.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testSelectWithCompositeType(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT MyTable.a2, MyTable.a1._2 FROM MyTable"
+
+ val ds = env.fromElements(((12, true), "Hello")).toTable(tEnv).as('a1, 'a2)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "Hello,true\n"
+
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/UserDefinedTableFunctionTest.scala
new file mode 100644
index 0000000..e091da2
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/UserDefinedTableFunctionTest.scala
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.{HierarchyTableFunction, PojoTableFunc, TableFunc2}
+import org.apache.flink.table.utils._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class UserDefinedTableFunctionTest extends TableTestBase {
+
+ @Test
+ def testCrossJoin(): Unit = {
+ val util = batchTestUtil()
+ val func1 = new TableFunc1
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ util.addFunction("func1", func1)
+
+ val sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(c)) AS T(s)"
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetCorrelate",
+ batchTableNode(0),
+ term("invocation", "func1($cor0.c)"),
+ term("function", func1.getClass.getCanonicalName),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
+ term("joinType", "INNER")
+ ),
+ term("select", "c", "f0 AS s")
+ )
+
+ util.verifySql(sqlQuery, expected)
+
+ // test overloading
+
+ val sqlQuery2 = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(c, '$')) AS T(s)"
+
+ val expected2 = unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetCorrelate",
+ batchTableNode(0),
+ term("invocation", "func1($cor0.c, '$')"),
+ term("function", func1.getClass.getCanonicalName),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
+ term("joinType", "INNER")
+ ),
+ term("select", "c", "f0 AS s")
+ )
+
+ util.verifySql(sqlQuery2, expected2)
+ }
+
+ @Test
+ def testLeftOuterJoin(): Unit = {
+ val util = batchTestUtil()
+ val func1 = new TableFunc1
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ util.addFunction("func1", func1)
+
+ val sqlQuery = "SELECT c, s FROM MyTable LEFT JOIN LATERAL TABLE(func1(c)) AS T(s) ON TRUE"
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetCorrelate",
+ batchTableNode(0),
+ term("invocation", "func1($cor0.c)"),
+ term("function", func1.getClass.getCanonicalName),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
+ term("joinType", "LEFT")
+ ),
+ term("select", "c", "f0 AS s")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testCustomType(): Unit = {
+ val util = batchTestUtil()
+ val func2 = new TableFunc2
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ util.addFunction("func2", func2)
+
+ val sqlQuery = "SELECT c, name, len FROM MyTable, LATERAL TABLE(func2(c)) AS T(name, len)"
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetCorrelate",
+ batchTableNode(0),
+ term("invocation", "func2($cor0.c)"),
+ term("function", func2.getClass.getCanonicalName),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
+ "VARCHAR(2147483647) f0, INTEGER f1)"),
+ term("joinType", "INNER")
+ ),
+ term("select", "c", "f0 AS name", "f1 AS len")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testHierarchyType(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ val function = new HierarchyTableFunction
+ util.addFunction("hierarchy", function)
+
+ val sqlQuery = "SELECT c, T.* FROM MyTable, LATERAL TABLE(hierarchy(c)) AS T(name, adult, len)"
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetCorrelate",
+ batchTableNode(0),
+ term("invocation", "hierarchy($cor0.c)"),
+ term("function", function.getClass.getCanonicalName),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
+ " VARCHAR(2147483647) f0, BOOLEAN f1, INTEGER f2)"),
+ term("joinType", "INNER")
+ ),
+ term("select", "c", "f0 AS name", "f1 AS adult", "f2 AS len")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testPojoType(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ val function = new PojoTableFunc
+ util.addFunction("pojo", function)
+
+ val sqlQuery = "SELECT c, name, age FROM MyTable, LATERAL TABLE(pojo(c))"
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetCorrelate",
+ batchTableNode(0),
+ term("invocation", "pojo($cor0.c)"),
+ term("function", function.getClass.getCanonicalName),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
+ " INTEGER age, VARCHAR(2147483647) name)"),
+ term("joinType", "INNER")
+ ),
+ term("select", "c", "name", "age")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testFilter(): Unit = {
+ val util = batchTestUtil()
+ val func2 = new TableFunc2
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ util.addFunction("func2", func2)
+
+ val sqlQuery = "SELECT c, name, len FROM MyTable, LATERAL TABLE(func2(c)) AS T(name, len) " +
+ "WHERE len > 2"
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetCorrelate",
+ batchTableNode(0),
+ term("invocation", "func2($cor0.c)"),
+ term("function", func2.getClass.getCanonicalName),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
+ "VARCHAR(2147483647) f0, INTEGER f1)"),
+ term("joinType", "INNER"),
+ term("condition", ">($1, 2)")
+ ),
+ term("select", "c", "f0 AS name", "f1 AS len")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+
+ @Test
+ def testScalarFunction(): Unit = {
+ val util = batchTestUtil()
+ val func1 = new TableFunc1
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ util.addFunction("func1", func1)
+
+ val sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(SUBSTRING(c, 2))) AS T(s)"
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetCorrelate",
+ batchTableNode(0),
+ term("invocation", "func1(SUBSTRING($cor0.c, 2))"),
+ term("function", func1.getClass.getCanonicalName),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
+ term("joinType", "INNER")
+ ),
+ term("select", "c", "f0 AS s")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+}